diff --git a/docs/Privacy.md b/docs/Privacy.md index 95aee00b0b..f7bd8a0127 100644 --- a/docs/Privacy.md +++ b/docs/Privacy.md @@ -6,11 +6,15 @@ The software may collect information about you and your use of the software and *** ## Technical Details -Olive uses the [OpenTelemetry](https://opentelemetry.io/) API for its implementation. Telemetry is turned ON by default. Based on user consent, this data may be periodically sent to Microsoft servers following GDPR and privacy regulations for anonymity and data access controls. Application, device, and version information is collected automatically. +Telemetry is turned ON by default. Based on user consent, this data may be periodically sent to Microsoft servers following GDPR and privacy regulations for anonymity and data access controls. Application, device, and version information is collected automatically. In addition, Olive may collect additional telemetry data such as: - Invoked commands - Performance data - Exception information -Collection of this additional telemetry can be disabled by adding the `--disable_telemetry` flag to any Olive CLI command, or by setting the `OLIVE_DISABLE_TELEMETRY` environment variable to `1` before running. Telemetry is also automatically disabled when a CI/CD environment is detected (e.g., GitHub Actions, Azure Pipelines, Jenkins). If telemetry is enabled, but cannot be sent to Microsoft, it will be stored locally and sent when a connection is available. You can override the default cache location by setting the `OLIVE_TELEMETRY_CACHE_DIR` environment variable to a valid directory path. +You can disable telemetry by adding the `--disable_telemetry` flag to any Olive CLI command, or by setting the `OLIVE_DISABLE_TELEMETRY` environment variable to `1` before running. When telemetry is disabled this way, the additional telemetry above (commands, performance, exceptions) is not sent. A minimal device-id heartbeat — a non-reversible hashed device identifier plus basic operating-system name, version, release, and architecture — is still sent outside CI/CD environments so Microsoft can count active devices; it contains no command, performance, or exception data. + +In CI/CD environments (e.g., GitHub Actions, Azure Pipelines, Jenkins), Olive suppresses the device-id heartbeat and the action/error events and only emits the `OliveRecipe` event. The `OliveRecipe` event may include recipe metadata such as pass types, explicitly configured target settings, the host system type (including the default `LocalSystem` host) and any explicitly configured host accelerator settings, whether a custom package config was provided, a redacted snapshot of custom package-config overrides, and a redacted snapshot of explicitly supplied config overrides. Setting `OLIVE_DISABLE_TELEMETRY=1` in a CI/CD environment sends nothing at all. + +Telemetry is implemented using only the Python standard library. Events are written to a local per-user SQLite queue and uploaded in the background to Microsoft over HTTPS. If telemetry is enabled but cannot be sent (for example, while offline), events remain in the local queue and are uploaded on a later run when a connection is available. diff --git a/olive/cli/base.py b/olive/cli/base.py index e803311f27..289f2c39be 100644 --- a/olive/cli/base.py +++ b/olive/cli/base.py @@ -115,7 +115,7 @@ def _run_workflow(self): mark_test_output_path(self.args.output_path) print("Dry run mode enabled. Configuration file is generated but no optimization is performed.") return None - workflow_output = olive_run(run_config) + workflow_output = olive_run(run_config, recipe_telemetry_metadata=self._get_recipe_telemetry_metadata()) if getattr(self.args, "test", None) not in (None, False): mark_test_output_path(self.args.output_path) if not workflow_output.has_output_model(): @@ -124,6 +124,17 @@ def _run_workflow(self): print(f"Model is saved at {self.args.output_path}") return workflow_output + def _get_recipe_telemetry_metadata(self) -> dict[str, str]: + recipe_name = self.__class__.__name__ + if recipe_name.endswith("Command"): + recipe_name = recipe_name[: -len("Command")] + return { + "recipe_name": recipe_name, + "recipe_command": recipe_name, + "recipe_source": "generated_cli", + "recipe_format": "generated", + } + @staticmethod def _parse_extra_options(kv_items): from onnxruntime_genai import __version__ as OrtGenaiVersion diff --git a/olive/cli/launcher.py b/olive/cli/launcher.py index 55e6ffdeb4..332fd7eb20 100644 --- a/olive/cli/launcher.py +++ b/olive/cli/launcher.py @@ -2,6 +2,7 @@ # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. # -------------------------------------------------------------------------- +import os import sys from argparse import ArgumentParser from warnings import warn @@ -66,9 +67,11 @@ def main(raw_args=None, called_as_console_script: bool = True): args, unknown_args = parser.parse_known_args(raw_args) - telemetry = Telemetry() + # Honor --disable-telemetry BEFORE constructing Telemetry, so a disabled run + # never starts the uploader or drains/uploads the durable store. if args.disable_telemetry: - telemetry.disable_telemetry() + os.environ["OLIVE_DISABLE_TELEMETRY"] = "1" + telemetry = Telemetry() if not hasattr(args, "func"): parser.print_help() diff --git a/olive/cli/run.py b/olive/cli/run.py index 2c94af6d41..7599d756a6 100644 --- a/olive/cli/run.py +++ b/olive/cli/run.py @@ -53,16 +53,22 @@ def register_subcommand(parser: ArgumentParser): @action def run(self): + from copy import deepcopy + from pathlib import Path + from olive.common.config_utils import load_config_file from olive.workflows import run as olive_run # allow the run_config to be a dict already (for api use) - run_config = self.args.run_config - if not isinstance(run_config, dict): - run_config = load_config_file(run_config) + run_config_input = self.args.run_config + run_config = ( + deepcopy(run_config_input) if isinstance(run_config_input, dict) else load_config_file(run_config_input) + ) + config_overrides = {} if input_model_config := get_input_model_config(self.args, required=False): print("Replacing input model config in run config") run_config["input_model"] = input_model_config + config_overrides["input_model"] = input_model_config elif self.args.test not in (None, False): input_model = run_config.get("input_model") if not isinstance(input_model, dict) or input_model.get("type", "").lower() != "hfmodel": @@ -79,6 +85,19 @@ def run(self): run_config.get("engine", {}).pop(rc_key, None) # add value to run config directly run_config[rc_key] = arg_value + config_overrides[rc_key] = arg_value + + recipe_telemetry_metadata = { + "recipe_command": "WorkflowRun", + "recipe_source": "config_dict" if isinstance(run_config_input, dict) else "config_file", + "recipe_format": "dict" + if isinstance(run_config_input, dict) + else Path(run_config_input).suffix.lstrip(".").lower() or "unknown", + "execution_mode": "list_required_packages" if self.args.list_required_packages else "run", + "package_config_provided": bool(self.args.package_config), + } + if config_overrides: + recipe_telemetry_metadata["config_overrides"] = config_overrides output_path = run_config.get("output_dir") or run_config.get("engine", {}).get("output_dir") validate_test_output_path(output_path, self.args.test) @@ -89,6 +108,7 @@ def run(self): list_required_packages=self.args.list_required_packages, tempdir=self.args.tempdir, package_config=self.args.package_config, + recipe_telemetry_metadata=recipe_telemetry_metadata, ) if self.args.test not in (None, False): mark_test_output_path(output_path) diff --git a/olive/systems/docker/docker_system.py b/olive/systems/docker/docker_system.py index 8371cccd44..2a479ec690 100644 --- a/olive/systems/docker/docker_system.py +++ b/olive/systems/docker/docker_system.py @@ -232,6 +232,8 @@ def _prepare_run_params(self) -> dict: def _prepare_environment(self, base_env) -> dict: """Prepare environment variables for container.""" + from olive.telemetry.telemetry import is_ci_environment + # Convert list to dict if needed if isinstance(base_env, list): environment = {env.split("=")[0]: env.split("=")[1] for env in base_env} @@ -241,6 +243,8 @@ def _prepare_environment(self, base_env) -> dict: # Add default environment variables environment.setdefault("PYTHONPYCACHEPREFIX", "/tmp") environment["OLIVE_LOG_LEVEL"] = logging.getLevelName(logger.getEffectiveLevel()) + if is_ci_environment(): + environment["CI"] = "1" # Add HuggingFace token if needed if self.hf_token: diff --git a/olive/systems/docker/workflow_runner.py b/olive/systems/docker/workflow_runner.py index 5842d0bd49..be0d59d671 100644 --- a/olive/systems/docker/workflow_runner.py +++ b/olive/systems/docker/workflow_runner.py @@ -20,7 +20,7 @@ def runner_entry(config): config = json.load(f) logger.info("Running workflow with config: %s", config) - olive_run(config) + olive_run(config, emit_recipe_telemetry=False) if __name__ == "__main__": diff --git a/olive/telemetry/constants.py b/olive/telemetry/constants.py index ca9e150b1b..25a60e813e 100644 --- a/olive/telemetry/constants.py +++ b/olive/telemetry/constants.py @@ -3,6 +3,6 @@ # Licensed under the MIT License. # -------------------------------------------------------------------------- -"""OneCollector connection string.""" +"""Telemetry constants.""" -CONNECTION_STRING = "SW5zdHJ1bWVudGF0aW9uS2V5PTlkNWRkYWVjNjFlMjQ1NjdiNzg4YTIwYWVhMzI0NjMxLTcyMzdkN2M2LWVlNjEtNGNmZC1iYjdiLTU5MDNhOTcyYzJlNC03MDQ3" +CONNECTION_STRING = "SW5zdHJ1bWVudGF0aW9uS2V5PTYyMTUwOTExZGMwMDRmYzliYjY3YmE5NjA2NDI3ZTU2LWVjNjFmOWFmLTVkN2EtNGQxOS1hZjMxLWI5Y2Q2OWU5ODdmMS02OTE1" diff --git a/olive/telemetry/library/__init__.py b/olive/telemetry/library/__init__.py index 39831da66e..fa6d95b124 100644 --- a/olive/telemetry/library/__init__.py +++ b/olive/telemetry/library/__init__.py @@ -3,62 +3,25 @@ # Licensed under the MIT License. # -------------------------------------------------------------------------- -"""OneCollector Exporter for OpenTelemetry Python. +"""OneCollector building blocks (standard library only). -This package provides an OpenTelemetry exporter that sends telemetry data -to Microsoft OneCollector using the Common Schema JSON format. - -Example usage: - - from onecollector_exporter import ( - OneCollectorLogExporter, - OneCollectorExporterOptions, - get_telemetry_logger, - ) - - # Option 1: Use with OpenTelemetry SDK directly - options = OneCollectorExporterOptions( - connection_string="InstrumentationKey=your-key-here" - ) - exporter = OneCollectorLogExporter(options=options) - - # Add to logger provider - from opentelemetry.sdk._logs import LoggerProvider - from opentelemetry.sdk._logs.export import BatchLogRecordProcessor - - provider = LoggerProvider() - provider.add_log_record_processor(BatchLogRecordProcessor(exporter)) - - # Option 2: Use the simplified telemetry logger - logger = get_telemetry_logger( - connection_string="InstrumentationKey=your-key-here" - ) - logger.log("MyEvent", {"key": "value"}) - logger.shutdown() +Helpers for serializing telemetry to Common Schema JSON and posting it to the +Microsoft OneCollector endpoint. These modules have no third-party dependency +and are driven directly by the SQLite-backed uploader. """ -from olive.telemetry.library.callback_manager import CallbackManager, PayloadTransmittedCallbackArgs -from olive.telemetry.library.connection_string_parser import ConnectionStringParser -from olive.telemetry.library.event_source import OneCollectorEventId, OneCollectorEventSource, event_source -from olive.telemetry.library.exporter import OneCollectorLogExporter -from olive.telemetry.library.options import ( +from .callback_manager import CallbackManager, PayloadTransmittedCallbackArgs +from .connection_string_parser import ConnectionStringParser +from .event_source import OneCollectorEventId, OneCollectorEventSource, event_source +from .options import ( CompressionType, OneCollectorExporterOptions, OneCollectorExporterValidationError, OneCollectorTransportOptions, ) -from olive.telemetry.library.payload_builder import PayloadBuilder -from olive.telemetry.library.retry import RetryHandler -from olive.telemetry.library.serialization import CommonSchemaJsonSerializationHelper -from olive.telemetry.library.telemetry_logger import ( - TelemetryLogger, - get_telemetry_logger, - log_event, - shutdown_telemetry, -) -from olive.telemetry.library.transport import HttpJsonPostTransport, ITransport - -__version__ = "0.0.1" +from .payload_builder import PayloadBuilder +from .serialization import CommonSchemaJsonSerializationHelper +from .transport import HttpJsonPostTransport, ITransport __all__ = [ "CallbackManager", @@ -71,14 +34,8 @@ "OneCollectorEventSource", "OneCollectorExporterOptions", "OneCollectorExporterValidationError", - "OneCollectorLogExporter", "OneCollectorTransportOptions", "PayloadBuilder", "PayloadTransmittedCallbackArgs", - "RetryHandler", - "TelemetryLogger", "event_source", - "get_telemetry_logger", - "log_event", - "shutdown_telemetry", ] diff --git a/olive/telemetry/library/exporter.py b/olive/telemetry/library/exporter.py deleted file mode 100644 index 045647fe9d..0000000000 --- a/olive/telemetry/library/exporter.py +++ /dev/null @@ -1,335 +0,0 @@ -# ------------------------------------------------------------------------- -# Copyright (c) Microsoft Corporation. All rights reserved. -# Licensed under the MIT License. -# -------------------------------------------------------------------------- - -"""Main OneCollector log exporter implementation.""" - -import threading -from collections.abc import Sequence -from datetime import datetime, timezone -from time import time -from typing import TYPE_CHECKING, Any, Callable, Optional - -import requests -from opentelemetry.sdk._logs import ReadableLogRecord -from opentelemetry.sdk._logs.export import LogExportResult, LogRecordExporter -from opentelemetry.sdk.resources import Resource - -from olive.telemetry.library.callback_manager import CallbackManager -from olive.telemetry.library.event_source import event_source -from olive.telemetry.library.options import OneCollectorExporterOptions -from olive.telemetry.library.payload_builder import PayloadBuilder -from olive.telemetry.library.retry import RetryHandler -from olive.telemetry.library.serialization import CommonSchemaJsonSerializationHelper -from olive.telemetry.library.transport import HttpJsonPostTransport - -if TYPE_CHECKING: - from olive.telemetry.library.callback_manager import PayloadTransmittedCallbackArgs - - -class OneCollectorLogExporter(LogRecordExporter): - """OpenTelemetry log exporter for Microsoft OneCollector. - - Implements the OpenTelemetry LogRecordExporter interface and sends logs - to OneCollector using the Common Schema JSON format. - """ - - def __init__( - self, - options: Optional[OneCollectorExporterOptions] = None, - excluded_attributes: Optional[set[str]] = None, - ): - """Initialize the OneCollector log exporter. - - Args: - options: Exporter configuration options - excluded_attributes: Attribute keys to exclude from log attributes - - """ - # Validate options - if options is None: - raise ValueError("OneCollectorExporterOptions is required") - options.validate() - - self._options = options - self._shutdown_lock = threading.Lock() - self._shutdown = False - self._shutdown_event = threading.Event() - if excluded_attributes is None: - self._excluded_attributes = { - "code.filepath", - "code.function", - "code.lineno", - "code.file.path", - "code.function.name", - "code.line.number", - } - else: - self._excluded_attributes = set(excluded_attributes) - - # Initialize transport - transport_opts = options.transport_options - - # Create or get HTTP session - if transport_opts.http_client_factory: - self._session = transport_opts.http_client_factory() - self._owns_session = False - else: - self._session = requests.Session() - self._owns_session = True - - try: - # Build iKey with tenant prefix - self._ikey = f"{CommonSchemaJsonSerializationHelper.ONE_COLLECTOR_TENANCY_SYMBOL}:{options.tenant_token}" - - # Initialize callback manager - self._callback_manager = CallbackManager() - - # Initialize transport with callback manager - self._transport = HttpJsonPostTransport( - endpoint=transport_opts.endpoint, - ikey=options.instrumentation_key, - compression=transport_opts.compression, - session=self._session, - callback_manager=self._callback_manager, - ) - - # Initialize payload builder - self._payload_builder = PayloadBuilder( - max_size_bytes=transport_opts.max_payload_size_bytes, max_items=transport_opts.max_items_per_payload - ) - - # Initialize retry handler - self._retry_handler = RetryHandler(max_retries=6) - - # Initialize metadata - self._metadata: dict[str, Any] = {} - - # Cache for resource (populated on first export) - self._resource: Optional[Resource] = None - except Exception: - if self._owns_session: - self._session.close() - raise - - def add_metadata(self, metadata: dict[str, Any]) -> None: - """Add custom metadata fields to all exported logs. - - Args: - metadata: Dictionary of metadata fields to add - - """ - self._metadata.update(metadata) - - def register_payload_transmitted_callback( - self, callback: Callable[["PayloadTransmittedCallbackArgs"], None], include_failures: bool = False - ) -> Callable[[], None]: - """Register a callback that will be invoked on payload transmission. - - Callbacks are invoked after each HTTP request completes. If retries are - enabled, callbacks will be invoked for each retry attempt. - - Args: - callback: Function to call when payload is transmitted. - Receives PayloadTransmittedCallbackArgs with transmission details. - include_failures: If True, callback is invoked on both success and failure. - If False, callback is only invoked on success. - - Returns: - Function to call to unregister the callback. - - Example: - >>> def on_transmitted(args): - ... if args.succeeded: - ... print(f"✅ Sent {args.item_count} items ({args.payload_size_bytes} bytes)") - ... else: - ... print(f"❌ Failed: status={args.status_code}") - >>> - >>> unregister = exporter.register_payload_transmitted_callback( - ... on_transmitted, - ... include_failures=True - ... ) - >>> # Later: unregister() - - """ - return self._transport.register_payload_transmitted_callback(callback, include_failures) - - def export(self, batch: Sequence[ReadableLogRecord]) -> LogExportResult: - """Export a batch of log records. - - Args: - batch: Sequence of log data records to export - - Returns: - LogExportResult indicating success or failure - - """ - if self._shutdown: - return LogExportResult.FAILURE - - try: - # Get resource (cache for subsequent calls) - if self._resource is None: - first_item = batch[0] if batch else None - resource = getattr(first_item, "resource", None) - if resource is None and first_item is not None: - resource = getattr(first_item.log_record, "resource", None) - self._resource = resource or Resource.create() - - # Serialize log records to JSON - serialized_items = [] - for log_data in batch: - try: - item_bytes = self._serialize_log_data(log_data) - serialized_items.append(item_bytes) - except Exception as ex: - event_source.export_exception_thrown("ReadableLogRecord", ex) - # Continue with other items - - if not serialized_items: - return LogExportResult.FAILURE - - # Build payloads respecting size/count limits - payloads = self._build_payloads(serialized_items) - - # Send each payload with retry logic - deadline_sec = time() + self._options.transport_options.timeout_seconds - - for payload in payloads: - # Count items in this payload (approximation based on newlines) - item_count = payload.count(b"\n") + 1 if payload else 0 - success = self._retry_handler.execute_with_retry( - operation=lambda payload=payload, item_count=item_count: self._transport.send( - payload, max(0.1, deadline_sec - time()), item_count=item_count - ), - deadline_sec=deadline_sec, - shutdown_event=self._shutdown_event, - ) - - if not success: - return LogExportResult.FAILURE - - # Check if shutdown occurred - if self._shutdown: - return LogExportResult.FAILURE - - # Log success - event_source.sink_data_written("ReadableLogRecord", len(batch), "OneCollector") - - return LogExportResult.SUCCESS - - except Exception as ex: - event_source.export_exception_thrown("ReadableLogRecord", ex) - return LogExportResult.FAILURE - - def _serialize_log_data(self, log_data: ReadableLogRecord) -> bytes: - """Serialize a single log record to JSON bytes. - - Args: - log_data: Log data to serialize - - Returns: - UTF-8 encoded JSON bytes - - """ - log_record = log_data.log_record - - # Build data dictionary - data = {} - - # Add resource attributes (if available) - if self._resource and self._resource.attributes: - for key, value in self._resource.attributes.items(): - # Map common resource attributes - if key == "service.name" and "app_name" not in data: - data["app_name"] = value - elif key == "service.version" and "app_version" not in data: - data["app_version"] = value - elif key == "service.instance.id" and "app_instance_id" not in data: - data["app_instance_id"] = value - else: - data[key] = value - - # Add log record attributes (override resource attributes) - if log_record.attributes: - data.update( - {key: value for key, value in log_record.attributes.items() if key not in self._excluded_attributes} - ) - - # Add custom metadata - data.update(self._metadata) - - # Format timestamp - if log_record.timestamp: - timestamp = datetime.fromtimestamp(log_record.timestamp / 1e9, tz=timezone.utc) - else: - timestamp = datetime.now(timezone.utc) - - # Create event envelope - event_name = str(log_record.body) if log_record.body else "UnnamedEvent" - - envelope = CommonSchemaJsonSerializationHelper.create_event_envelope( - event_name=event_name, timestamp=timestamp, ikey=self._ikey, data=data - ) - - # Serialize to JSON bytes - return CommonSchemaJsonSerializationHelper.serialize_to_json_bytes(envelope) - - def _build_payloads(self, serialized_items: list[bytes]) -> list[bytes]: - """Build payloads from serialized items respecting size and count limits. - - Args: - serialized_items: List of serialized item bytes - - Returns: - List of payload bytes - - """ - payloads = [] - self._payload_builder.reset() - - for item_bytes in serialized_items: - if not self._payload_builder.can_add(item_bytes) and not self._payload_builder.is_empty: - # Current payload is full, build it and start a new one - payloads.append(self._payload_builder.build()) - self._payload_builder.reset() - - self._payload_builder.add(item_bytes) - - # Build final payload - if not self._payload_builder.is_empty: - payloads.append(self._payload_builder.build()) - - return payloads - - def force_flush(self, timeout_millis: float = 10_000) -> bool: - """Force flush any buffered data. - - Note: This exporter doesn't buffer data internally, so this is a no-op. - - Args: - timeout_millis: Timeout in milliseconds - - Returns: - True (always succeeds) - - """ - return True - - def shutdown(self) -> None: - """Shutdown the exporter and release resources.""" - with self._shutdown_lock: - if self._shutdown: - return - - self._shutdown = True - self._shutdown_event.set() - - # Close HTTP session (only if we own it) - if hasattr(self, "_session") and getattr(self, "_owns_session", True): - self._session.close() - - # Close callback manager - if hasattr(self, "_callback_manager"): - self._callback_manager.close() diff --git a/olive/telemetry/library/options.py b/olive/telemetry/library/options.py index dd934cad2d..92982c4c4c 100644 --- a/olive/telemetry/library/options.py +++ b/olive/telemetry/library/options.py @@ -7,11 +7,9 @@ from dataclasses import dataclass, field from enum import Enum -from typing import Callable, Optional +from typing import Optional -import requests - -from olive.telemetry.library.connection_string_parser import ConnectionStringParser +from .connection_string_parser import ConnectionStringParser class CompressionType(Enum): @@ -35,7 +33,6 @@ class OneCollectorTransportOptions: max_items_per_payload: int = DEFAULT_MAX_ITEMS_PER_PAYLOAD compression: CompressionType = CompressionType.DEFLATE timeout_seconds: float = 10.0 - http_client_factory: Optional[Callable[[], requests.Session]] = None def validate(self) -> None: """Validate the transport options. @@ -62,6 +59,7 @@ class OneCollectorExporterOptions: """Configuration options for OneCollector exporter.""" connection_string: Optional[str] = None + service_name: Optional[str] = None transport_options: OneCollectorTransportOptions = field(default_factory=OneCollectorTransportOptions) # Internal fields populated during validation diff --git a/olive/telemetry/library/retry.py b/olive/telemetry/library/retry.py deleted file mode 100644 index 9f0cc7cfd8..0000000000 --- a/olive/telemetry/library/retry.py +++ /dev/null @@ -1,98 +0,0 @@ -# ------------------------------------------------------------------------- -# Copyright (c) Microsoft Corporation. All rights reserved. -# Licensed under the MIT License. -# -------------------------------------------------------------------------- - -"""Retry logic with exponential backoff for OneCollector exporter.""" - -import random -import threading -from time import time -from typing import Callable, Optional - -from olive.telemetry.library.event_source import event_source -from olive.telemetry.library.transport import HttpJsonPostTransport - - -class RetryHandler: - """Handles retry logic with exponential backoff and jitter. - - Implements retry strategy matching the .NET implementation. - """ - - def __init__(self, max_retries: int = 6, base_delay: float = 1.0, max_delay: float = 60.0): - """Initialize retry handler. - - Args: - max_retries: Maximum number of retry attempts - base_delay: Base delay for exponential backoff (seconds) - max_delay: Maximum delay between retries (seconds) - - """ - self.max_retries = max_retries - self.base_delay = base_delay - self.max_delay = max_delay - - def execute_with_retry( - self, - operation: Callable[[], tuple[bool, Optional[int]]], - deadline_sec: float, - shutdown_event: threading.Event, - ) -> bool: - """Execute an operation with retry logic. - - Args: - operation: Function that returns (success, status_code) - deadline_sec: Absolute deadline timestamp - shutdown_event: Event to signal shutdown - - Returns: - True if operation succeeded, False otherwise - - """ - for retry_num in range(self.max_retries): - # Check if we've exceeded the deadline - remaining_time = deadline_sec - time() - if remaining_time <= 0: - return False - - try: - # Execute the operation - success, status_code = operation() - - if success: - return True - - # Check if response is retryable - if not HttpJsonPostTransport.is_retryable(status_code): - return False - - except Exception as ex: - event_source.export_exception_thrown("RetryHandler", ex) - - # Last retry - don't wait - if retry_num + 1 == self.max_retries: - return False - - # Last retry - failed - if retry_num + 1 == self.max_retries: - return False - - # Calculate backoff with exponential increase and jitter - backoff = min(self.base_delay * (2**retry_num), self.max_delay) - # Add +/-20% jitter - backoff *= random.uniform(0.8, 1.2) - - # Don't wait longer than remaining time - remaining_time = deadline_sec - time() - wait_time = min(backoff, remaining_time) - - if wait_time <= 0: - return False - - # Wait with ability to interrupt on shutdown - if shutdown_event.wait(wait_time): - # Shutdown occurred - return False - - return False diff --git a/olive/telemetry/library/telemetry_logger.py b/olive/telemetry/library/telemetry_logger.py deleted file mode 100644 index 7eb236e759..0000000000 --- a/olive/telemetry/library/telemetry_logger.py +++ /dev/null @@ -1,197 +0,0 @@ -# ------------------------------------------------------------------------- -# Copyright (c) Microsoft Corporation. All rights reserved. -# Licensed under the MIT License. -# -------------------------------------------------------------------------- - -"""High-level telemetry logger facade for easy usage.""" - -import logging -import uuid -from typing import Any, Callable, Optional - -from opentelemetry._logs import set_logger_provider -from opentelemetry.sdk._logs import LoggerProvider, LoggingHandler -from opentelemetry.sdk._logs.export import BatchLogRecordProcessor -from opentelemetry.sdk.resources import Resource - -from olive.telemetry.library.exporter import OneCollectorLogExporter -from olive.telemetry.library.options import OneCollectorExporterOptions -from olive.version import __version__ as VERSION - - -class TelemetryLogger: - """Singleton telemetry logger for simplified OneCollector integration. - - Provides a simple interface for logging telemetry events without - needing to configure OpenTelemetry directly. - """ - - _instance: Optional["TelemetryLogger"] = None - _default_logger: Optional["TelemetryLogger"] = None - _logger: Optional[logging.Logger] = None - _logger_exporter: Optional[OneCollectorLogExporter] = None - _logger_provider: Optional[LoggerProvider] = None - - def __new__(cls, options: Optional[OneCollectorExporterOptions] = None): - """Create or return the singleton instance. - - Args: - options: Exporter options (only used on first instantiation) - - """ - if cls._instance is None: - cls._instance = super().__new__(cls) - cls._instance._initialize(options) - - return cls._instance - - def _initialize(self, options: Optional[OneCollectorExporterOptions]) -> None: - """Initialize the logger (called only once). - - Args: - options: Exporter configuration options - - """ - try: - # Create exporter - self._logger_exporter = OneCollectorLogExporter(options=options) - - # Create logger provider - self._logger_provider = LoggerProvider( - resource=Resource.create( - { - "service.name": __name__.split(".", maxsplit=1)[0], - "service.version": VERSION, - "service.instance.id": str(uuid.uuid4()), # Unique instance ID; can double as session ID - } - ) - ) - - # Set as global logger provider - set_logger_provider(self._logger_provider) - - # Add batch processor - self._logger_provider.add_log_record_processor( - BatchLogRecordProcessor( - self._logger_exporter, - schedule_delay_millis=1000, - ) - ) - - # Create logging handler - handler = LoggingHandler(level=logging.INFO, logger_provider=self._logger_provider) - - # Set up Python logger - logger = logging.getLogger(__name__) - logger.propagate = False - logger.setLevel(logging.INFO) - logger.addHandler(handler) - - self._logger = logger - - except Exception: - # Silently fail initialization - logger will be None - self._logger = None - self._logger_provider = None - self._logger_exporter = None - - def add_global_metadata(self, metadata: dict[str, Any]) -> None: - """Add metadata fields to all telemetry events. - - Args: - metadata: Dictionary of metadata to add - - """ - if self._logger_exporter: - self._logger_exporter.add_metadata(metadata) - - def register_payload_transmitted_callback( - self, callback, include_failures: bool = False - ) -> Optional[Callable[[], None]]: - """Register a callback for payload transmission events.""" - if self._logger_exporter: - return self._logger_exporter.register_payload_transmitted_callback(callback, include_failures) - return None - - def log(self, event_name: str, attributes: Optional[dict[str, Any]] = None) -> None: - """Log a telemetry event. - - Args: - event_name: Name of the event - attributes: Optional event attributes - - """ - if self._logger: - extra = attributes if attributes else {} - self._logger.info(event_name, extra=extra) - - def disable_telemetry(self) -> None: - """Disable telemetry logging.""" - if self._logger: - self._logger.disabled = True - - def enable_telemetry(self) -> None: - """Enable telemetry logging.""" - if self._logger: - self._logger.disabled = False - - def shutdown(self) -> None: - """Shutdown the telemetry logger and flush pending data.""" - if self._logger_provider: - self._logger_provider.shutdown() - - @classmethod - def get_default_logger(cls, connection_string: Optional[str] = None) -> "TelemetryLogger": - """Get or create the default telemetry logger. - - Args: - connection_string: OneCollector connection string (only used on first call) - - Returns: - TelemetryLogger instance - - """ - if cls._default_logger is None: - options = None - if connection_string: - options = OneCollectorExporterOptions(connection_string=connection_string) - cls._default_logger = cls(options=options) - - return cls._default_logger - - @classmethod - def shutdown_default_logger(cls) -> None: - """Shutdown the default telemetry logger.""" - if cls._default_logger: - cls._default_logger.shutdown() - cls._default_logger = None - - -def get_telemetry_logger(connection_string: Optional[str] = None) -> TelemetryLogger: - """Get or create the default telemetry logger. - - Args: - connection_string: OneCollector connection string (only used on first call) - - Returns: - TelemetryLogger instance - - """ - return TelemetryLogger.get_default_logger(connection_string=connection_string) - - -def log_event(event_name: str, attributes: Optional[dict[str, Any]] = None) -> None: - """Log a telemetry event using the default logger. - - Args: - event_name: Name of the event - attributes: Optional event attributes - - """ - logger = get_telemetry_logger() - logger.log(event_name, attributes) - - -def shutdown_telemetry() -> None: - """Shutdown the default telemetry logger.""" - TelemetryLogger.shutdown_default_logger() diff --git a/olive/telemetry/library/transport.py b/olive/telemetry/library/transport.py index 93db8ab27f..3d9bb302a3 100644 --- a/olive/telemetry/library/transport.py +++ b/olive/telemetry/library/transport.py @@ -3,21 +3,25 @@ # Licensed under the MIT License. # -------------------------------------------------------------------------- -"""HTTP transport implementation for OneCollector exporter.""" +"""HTTP transport for the OneCollector exporter (standard library only). + +Posts Common Schema JSON to the OneCollector endpoint using ``urllib`` so the +telemetry pipeline has no third-party dependency. +""" import gzip +import urllib.error +import urllib.request import zlib from abc import ABC, abstractmethod from io import BytesIO from typing import TYPE_CHECKING, Callable, Optional -import requests - -from olive.telemetry.library.event_source import event_source -from olive.telemetry.library.options import CompressionType +from .event_source import event_source +from .options import CompressionType if TYPE_CHECKING: - from olive.telemetry.library.callback_manager import CallbackManager, PayloadTransmittedCallbackArgs + from .callback_manager import CallbackManager, PayloadTransmittedCallbackArgs class ITransport(ABC): @@ -25,237 +29,131 @@ class ITransport(ABC): @abstractmethod def send(self, payload: bytes, timeout_sec: float, item_count: int = 1) -> tuple[bool, Optional[int]]: - """Send a payload. - - Args: - payload: The data to send - timeout_sec: Timeout in seconds - item_count: Number of items in the payload (for callbacks) - - Returns: - Tuple of (success, status_code) - - """ + """Send a payload. Returns (success, status_code).""" @abstractmethod def register_payload_transmitted_callback( self, callback: Callable[["PayloadTransmittedCallbackArgs"], None], include_failures: bool = False ) -> Callable[[], None]: - """Register a callback for payload transmission events. - - Args: - callback: Function to call when payload is transmitted - include_failures: Whether to invoke callback on failures - - Returns: - Function to call to unregister the callback - - """ + """Register a callback for payload transmission events.""" class HttpJsonPostTransport(ITransport): - """HTTP JSON POST transport implementation. - - Sends telemetry data to OneCollector via HTTP POST with JSON payload. - """ + """HTTP JSON POST transport using ``urllib`` (no third-party dependency).""" def __init__( self, endpoint: str, ikey: str, compression: CompressionType, - session: requests.Session, callback_manager: Optional["CallbackManager"] = None, - sdk_version: str = "OTel-python-1.0.0", + sdk_version: str = "py-genai-1.0.0", ): - """Initialize the HTTP transport. - - Args: - endpoint: OneCollector endpoint URL - ikey: Instrumentation key - compression: Compression type to use - session: Requests session for connection pooling - callback_manager: Optional callback manager for payload events - sdk_version: SDK version string - - """ self.endpoint = endpoint self.ikey = ikey self.compression = compression - self.session = session self.sdk_version = sdk_version self.callback_manager = callback_manager - # Build base headers self.headers = { "x-apikey": ikey, - "User-Agent": "Python/3 HttpClient", - "Host": "mobile.events.data.microsoft.com", + "User-Agent": "Python/3 urllib", "Content-Type": "application/x-json-stream; charset=utf-8", "sdk-version": sdk_version, "NoResponseBody": "true", } - if compression != CompressionType.NO_COMPRESSION: self.headers["Content-Encoding"] = compression.value def register_payload_transmitted_callback( self, callback: Callable[["PayloadTransmittedCallbackArgs"], None], include_failures: bool = False ) -> Callable[[], None]: - """Register a callback for payload transmission events. - - Args: - callback: Function to call when payload is transmitted - include_failures: Whether to invoke callback on failures - - Returns: - Function to call to unregister the callback - - """ if self.callback_manager is None: - # Import here to avoid circular dependency - from olive.telemetry.library.callback_manager import CallbackManager + from .callback_manager import CallbackManager self.callback_manager = CallbackManager() return self.callback_manager.register(callback, include_failures) def send(self, payload: bytes, timeout_sec: float, item_count: int = 1) -> tuple[bool, Optional[int]]: - """Send payload via HTTP POST. - - Args: - payload: Uncompressed payload bytes - timeout_sec: Request timeout in seconds - item_count: Number of items in the payload (for callbacks) - - Returns: - Tuple of (success, status_code) - - """ + """Send payload via HTTP POST. Returns (success, status_code).""" payload_size_bytes = len(payload) - try: - # Compress payload compressed_payload = self._compress(payload) - - # Update headers with content length headers = {**self.headers, "Content-Length": str(len(compressed_payload))} + request = urllib.request.Request( + url=self.endpoint, data=compressed_payload, headers=headers, method="POST" + ) - # Send request - try: - response = self.session.post( - url=self.endpoint, data=compressed_payload, headers=headers, timeout=timeout_sec - ) - except (requests.exceptions.ConnectionError, requests.exceptions.Timeout): - # Retry once on transient transport errors - response = self.session.post( - url=self.endpoint, data=compressed_payload, headers=headers, timeout=timeout_sec - ) - - # Check response - success = response.ok - status_code = response.status_code - - # Invoke callbacks - if self.callback_manager: - from olive.telemetry.library.callback_manager import PayloadTransmittedCallbackArgs + success, status_code = self._do_request(request, timeout_sec) - self.callback_manager.notify( - PayloadTransmittedCallbackArgs( - succeeded=success, - status_code=status_code, - payload_size_bytes=payload_size_bytes, - item_count=item_count, - payload_bytes=payload, - ) - ) + self._notify(success, status_code, payload_size_bytes, item_count, payload) if success: return True, status_code - else: - # Log error response - if event_source.is_error_logging_enabled: - collector_error = response.headers.get("Collector-Error", "") - error_details = response.text[:100] if response.text else "" - event_source.http_transport_error_response( - "HttpJsonPost", status_code, collector_error, error_details - ) - return False, status_code - - except requests.exceptions.Timeout: - # Invoke failure callbacks - if self.callback_manager: - from olive.telemetry.library.callback_manager import PayloadTransmittedCallbackArgs - - self.callback_manager.notify( - PayloadTransmittedCallbackArgs( - succeeded=False, - status_code=None, - payload_size_bytes=payload_size_bytes, - item_count=item_count, - payload_bytes=payload, - ) - ) + if event_source.is_error_logging_enabled and status_code is not None: + event_source.http_transport_error_response("HttpJsonPost", status_code, "", "") + return False, status_code - event_source.transport_exception_thrown("HttpJsonPost", Exception("Request timeout")) - return False, None except Exception as ex: - # Invoke failure callbacks - if self.callback_manager: - from olive.telemetry.library.callback_manager import PayloadTransmittedCallbackArgs - - self.callback_manager.notify( - PayloadTransmittedCallbackArgs( - succeeded=False, - status_code=None, - payload_size_bytes=payload_size_bytes, - item_count=item_count, - payload_bytes=payload, - ) - ) - + self._notify(False, None, payload_size_bytes, item_count, payload) event_source.transport_exception_thrown("HttpJsonPost", ex) return False, None - def _compress(self, data: bytes) -> bytes: - """Compress data according to configured compression type. - - Args: - data: Uncompressed data - - Returns: - Compressed data + @staticmethod + def _do_request(request: "urllib.request.Request", timeout_sec: float) -> tuple[bool, Optional[int]]: + """Perform the request, retrying once on a transient connection error.""" + for attempt in range(2): + try: + with urllib.request.urlopen(request, timeout=timeout_sec) as response: + response.read() + status = getattr(response, "status", response.getcode()) + return (200 <= status < 300, status) + except urllib.error.HTTPError as http_err: + # Server responded with a non-2xx status (4xx/5xx): not retried here. + try: + http_err.read() + except Exception: + pass + return (False, http_err.code) + except (urllib.error.URLError, TimeoutError, OSError): + # Connection-level failure: retry once, then give up. + if attempt == 0: + continue + return (False, None) + return (False, None) + + def _notify( + self, success: bool, status_code: Optional[int], payload_size_bytes: int, item_count: int, payload: bytes + ) -> None: + if not self.callback_manager: + return + from .callback_manager import PayloadTransmittedCallbackArgs + + self.callback_manager.notify( + PayloadTransmittedCallbackArgs( + succeeded=success, + status_code=status_code, + payload_size_bytes=payload_size_bytes, + item_count=item_count, + payload_bytes=payload, + ) + ) - """ + def _compress(self, data: bytes) -> bytes: if self.compression == CompressionType.DEFLATE: - # Raw deflate (no zlib header) compressor = zlib.compressobj(wbits=-zlib.MAX_WBITS) - compressed = compressor.compress(data) - compressed += compressor.flush() - return compressed - + return compressor.compress(data) + compressor.flush() elif self.compression == CompressionType.GZIP: gzip_buffer = BytesIO() with gzip.GzipFile(fileobj=gzip_buffer, mode="w") as gzip_file: gzip_file.write(data) return gzip_buffer.getvalue() - - else: # NO_COMPRESSION - return data + return data @staticmethod def is_retryable(status_code: Optional[int]) -> bool: - """Check if a response status code indicates the request should be retried. - - Args: - status_code: HTTP status code, or None if request failed - - Returns: - True if request should be retried - - """ + """Whether a response status indicates the request should be retried.""" if status_code is None: return True # Network errors are retryable - - # Retryable status codes return status_code in {408, 429, 500, 502, 503, 504} diff --git a/olive/telemetry/offline_store.py b/olive/telemetry/offline_store.py new file mode 100644 index 0000000000..5651fe8b6d --- /dev/null +++ b/olive/telemetry/offline_store.py @@ -0,0 +1,144 @@ +# ------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. +# -------------------------------------------------------------------------- + +"""SQLite-backed durable queue for telemetry events. + +A deliberately small subset of the Microsoft 1DS C++ SDK offline store +(cpp_client_telemetry/lib/offline/OfflineStorage_SQLite.cpp): a single FIFO +table of serialized event payloads. An uploader drains it, deleting rows on +success, dropping them on a permanent (non-retryable) send result, and leaving +them for the next attempt on a transient failure. Because every event is +written to disk before any network call, the process can exit at any time +without losing data and without an exit-time flush. + +Uses only the Python standard library (``sqlite3``), so it adds no dependency. + +Intentionally omitted from the full 1DS store (not needed for low-volume CLI +telemetry): per-event priority (``latency``), persistence classes, +reservation/leasing (``reserved_until``), per-row retry counters, tenant +multiplexing, and the ``settings`` table. The schema version is tracked with +SQLite's built-in ``PRAGMA user_version``. +""" + +import os +import sqlite3 +import threading +from typing import Optional + +SCHEMA_VERSION = 1 + + +class OfflineEventStore: + """Durable FIFO queue of serialized telemetry event payloads. + + All methods are best-effort and swallow storage errors: telemetry must + never crash the host application. Thread-safe via a per-instance lock; + tolerant of concurrent processes via WAL mode + ``busy_timeout``. + """ + + def __init__(self, db_path: str, max_records: int = 2048, busy_timeout_ms: int = 3000): + self._db_path = db_path + self._max_records = max_records + # When full, trim back to this watermark so we don't trim on every insert. + self._trim_target = max(1, (max_records * 3) // 4) + self._busy_timeout_ms = busy_timeout_ms + self._lock = threading.Lock() + self._conn: Optional[sqlite3.Connection] = None + self._initialize() + + def _initialize(self) -> None: + try: + os.makedirs(os.path.dirname(self._db_path), exist_ok=True) + except Exception: + pass + try: + conn = sqlite3.connect( + self._db_path, timeout=self._busy_timeout_ms / 1000.0, check_same_thread=False + ) + conn.execute("PRAGMA journal_mode=WAL") + conn.execute("PRAGMA synchronous=NORMAL") + conn.execute(f"PRAGMA busy_timeout={self._busy_timeout_ms}") + conn.execute( + "CREATE TABLE IF NOT EXISTS events (id INTEGER PRIMARY KEY AUTOINCREMENT, payload BLOB NOT NULL)" + ) + if conn.execute("PRAGMA user_version").fetchone()[0] == 0: + conn.execute(f"PRAGMA user_version={SCHEMA_VERSION}") + conn.commit() + self._conn = conn + except Exception: + self._conn = None + + @property + def is_open(self) -> bool: + return self._conn is not None + + @property + def db_path(self) -> str: + return self._db_path + + def store(self, payload: bytes) -> bool: + """Append one serialized event; trims the oldest rows if over capacity.""" + if not payload: + return False + with self._lock: + if self._conn is None: + return False + try: + self._conn.execute("INSERT INTO events (payload) VALUES (?)", (sqlite3.Binary(payload),)) + count = self._conn.execute("SELECT COUNT(*) FROM events").fetchone()[0] + if count > self._max_records: + self._conn.execute( + "DELETE FROM events WHERE id IN (SELECT id FROM events ORDER BY id ASC LIMIT ?)", + (count - self._trim_target,), + ) + self._conn.commit() + return True + except Exception: + return False + + def get_batch(self, max_count: int) -> list[tuple[int, bytes]]: + """Return up to ``max_count`` oldest events as (id, payload) pairs.""" + with self._lock: + if self._conn is None: + return [] + try: + rows = self._conn.execute( + "SELECT id, payload FROM events ORDER BY id ASC LIMIT ?", + (max_count if max_count > 0 else -1,), + ).fetchall() + return [(r[0], bytes(r[1])) for r in rows] + except Exception: + return [] + + def delete(self, ids: list[int]) -> None: + """Remove rows by id (after a successful upload or a permanent drop).""" + if not ids: + return + with self._lock: + if self._conn is None: + return + try: + self._conn.executemany("DELETE FROM events WHERE id=?", [(i,) for i in ids]) + self._conn.commit() + except Exception: + pass + + def count(self) -> int: + with self._lock: + if self._conn is None: + return 0 + try: + return int(self._conn.execute("SELECT COUNT(*) FROM events").fetchone()[0]) + except Exception: + return 0 + + def close(self) -> None: + with self._lock: + if self._conn is not None: + try: + self._conn.close() + except Exception: + pass + self._conn = None diff --git a/olive/telemetry/process_lock.py b/olive/telemetry/process_lock.py new file mode 100644 index 0000000000..24b103d427 --- /dev/null +++ b/olive/telemetry/process_lock.py @@ -0,0 +1,89 @@ +# ------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. +# -------------------------------------------------------------------------- + +"""Cross-platform single-holder advisory lock (standard library only). + +Used so that, when several processes on one device share a telemetry database, +only one of them runs the uploader's drain loop at a time. Other processes keep +writing events durably to the store; the lock holder drains everyone's rows. +This avoids the same event being uploaded twice by concurrent drainers without +needing per-row reservation bookkeeping. + +The lock is an OS advisory lock on a sidecar file (``msvcrt`` on Windows, +``fcntl`` on POSIX). It is released explicitly and also by the OS when the +process exits, so a crashed holder never blocks other processes permanently. +""" + +import os +from typing import Optional + + +class ProcessDrainLock: + """Non-blocking exclusive advisory lock backed by a sidecar file.""" + + def __init__(self, lock_path: str): + self._lock_path = lock_path + self._fh = None + + @property + def held(self) -> bool: + return self._fh is not None + + def acquire(self) -> bool: + """Try to acquire the lock without blocking. Returns True if held.""" + if self._fh is not None: + return True + fh = None + try: + try: + os.makedirs(os.path.dirname(self._lock_path), exist_ok=True) + except Exception: + pass + fh = open(self._lock_path, "a+b") + if os.name == "nt": + import msvcrt + + fh.seek(0) + msvcrt.locking(fh.fileno(), msvcrt.LK_NBLCK, 1) + else: + import fcntl + + fcntl.flock(fh.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB) + self._fh = fh + return True + except Exception: + if fh is not None: + try: + fh.close() + except Exception: + pass + return False + + def release(self) -> None: + if self._fh is None: + return + fh = self._fh + self._fh = None + try: + if os.name == "nt": + import msvcrt + + try: + fh.seek(0) + msvcrt.locking(fh.fileno(), msvcrt.LK_UNLCK, 1) + except Exception: + pass + else: + import fcntl + + try: + fcntl.flock(fh.fileno(), fcntl.LOCK_UN) + except Exception: + pass + finally: + try: + fh.close() + except Exception: + pass diff --git a/olive/telemetry/recipe_telemetry.py b/olive/telemetry/recipe_telemetry.py new file mode 100644 index 0000000000..baf735e9e3 --- /dev/null +++ b/olive/telemetry/recipe_telemetry.py @@ -0,0 +1,432 @@ +# ------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. +# -------------------------------------------------------------------------- +import functools +import json +import re +from copy import deepcopy +from os import PathLike +from pathlib import Path, PurePosixPath, PureWindowsPath +from typing import TYPE_CHECKING, Any, Optional, Union + +from olive.common.config_utils import load_config_file +from olive.common.utils import hash_dict +from olive.package_config import OlivePackageConfig +from olive.systems.common import SystemType +from olive.telemetry.telemetry import is_ci_environment + +if TYPE_CHECKING: + from olive.workflows.run.config import RunConfig + +RECIPE_HASH_REDACTED_VALUE = "" +CONFIG_REFERENCE_REDACTED_VALUE = "" +CONFIG_CALLABLE_REDACTED_VALUE = "" +RECIPE_HASH_REDACTED_KEYS = { + "output_dir", + "cache_dir", + "tempdir", + "additional_files", + "dockerfile", + "build_context_path", + "python_environment_path", + "prepend_to_path", + "script_dir", + "model_script", + # package_config is tracked separately via package_config_provided and + # package_config_overrides, but excluded from recipe_hash because it is an + # environment/infrastructure path. + "package_config", + "work_dir", +} +CONFIG_SNAPSHOT_REDACTED_KEYS = RECIPE_HASH_REDACTED_KEYS | { + "model_path", + "_name_or_path", + "adapter_path", + "user_script", +} +HF_MODEL_IDENTIFIER_KEYS = {"model_path", "_name_or_path"} +CONFIG_REFERENCE_KEYS = {"host", "target", "evaluator"} +LOCAL_MODEL_FILE_SUFFIXES = {".bin", ".model", ".onnx", ".pb", ".pt", ".pth", ".safetensors", ".tflite"} +HF_CACHE_MODEL_PATTERN = re.compile(r"(?:^|[\\/])models--([^\\/]+)--([^\\/]+)(?:[\\/]|$)") +HF_REPO_ID_PATTERN = re.compile(r"^[A-Za-z0-9][A-Za-z0-9._-]*(/[A-Za-z0-9][A-Za-z0-9._-]*)?$") +_NO_OVERRIDE = object() + + +def _build_recipe_result_metadata( + run_config_input: Union[str, Path, dict], + run_config_telemetry_input: Optional[Any], + run_config: Optional["RunConfig"], + recipe_telemetry_metadata: Optional[dict[str, Any]], + *, + list_required_packages: bool, + package_config_input: Optional[Union[str, Path, dict]], + package_config_provided: bool, +) -> dict[str, Any]: + metadata = dict(recipe_telemetry_metadata or {}) + default_source, default_format = _classify_run_config_source(run_config_input) + metadata.setdefault("recipe_source", default_source) + metadata.setdefault("recipe_format", default_format) + metadata.setdefault("execution_mode", "list_required_packages" if list_required_packages else "run") + metadata.setdefault("package_config_provided", package_config_provided) + config_overrides = metadata.pop("config_overrides", _NO_OVERRIDE) + if config_overrides is _NO_OVERRIDE: + config_overrides = _build_config_overrides(run_config_telemetry_input) + elif not isinstance(config_overrides, str): + config_overrides = _build_config_overrides(config_overrides) + if config_overrides is not None: + metadata["config_overrides"] = config_overrides + if package_config_provided: + package_config_overrides = _build_package_config_overrides(package_config_input) + if package_config_overrides is not None: + metadata.setdefault("package_config_overrides", package_config_overrides) + metadata["is_ci"] = is_ci_environment() + + if run_config is None: + metadata.setdefault("recipe_name", metadata.get("recipe_command") or "WorkflowRun") + return metadata + + run_config_json = run_config.to_json(make_absolute=False) + model_metadata = _extract_input_model_metadata(run_config_json["input_model"]) + target_metadata = _extract_target_metadata(run_config) + host_metadata = _extract_host_metadata(run_config) + pass_types = _get_used_pass_types(run_config) + + metadata.setdefault("recipe_name", metadata.get("recipe_command") or run_config.workflow_id) + metadata.setdefault("workflow_id", run_config.workflow_id) + metadata.setdefault("recipe_hash", _build_recipe_hash(run_config_json)) + metadata.setdefault("input_model_type", run_config.input_model.type) + metadata.setdefault("input_model_source", model_metadata["input_model_source"]) + metadata.setdefault("model_task", model_metadata["model_task"]) + _set_metadata_if_present(metadata, target_metadata) + _set_metadata_if_present(metadata, host_metadata) + metadata.setdefault("pass_types", ";".join(pass_types)) + metadata.setdefault("pass_count", len(pass_types)) + metadata.setdefault("data_config_count", len(run_config.data_configs)) + metadata.setdefault("search_enabled", bool(run_config.engine.search_strategy)) + return metadata + + +def _classify_run_config_source(run_config_input: Any) -> tuple[str, str]: + if isinstance(run_config_input, dict): + return "config_dict", "dict" + + if isinstance(run_config_input, (str, PathLike)): + suffix = Path(run_config_input).suffix.lstrip(".").lower() + return "config_file", suffix or "unknown" + + return "config_object", "object" + + +def _build_config_overrides(config_input: Any) -> Optional[str]: + try: + config_data = _load_config_input_for_telemetry(config_input) + if config_data is None: + return None + + snapshot = _sanitize_config_snapshot(config_data) + if snapshot in (None, {}, []): + return None + + return json.dumps(snapshot, ensure_ascii=False, sort_keys=True, separators=(",", ":")) + except Exception: + return None + + +def _build_package_config_overrides(config_input: Any) -> Optional[str]: + try: + config_data = _load_config_input_for_telemetry(config_input) + if not isinstance(config_data, dict): + return None + + default_config = _load_default_package_config_for_telemetry() + baseline = ( + _normalize_package_config_snapshot(default_config) if isinstance(default_config, dict) else _NO_OVERRIDE + ) + overrides = _extract_config_overrides(_normalize_package_config_snapshot(config_data), baseline) + if overrides is _NO_OVERRIDE: + return None + + snapshot = _sanitize_config_snapshot(overrides) + if not isinstance(snapshot, dict): + return None + + return json.dumps(snapshot, ensure_ascii=False, sort_keys=True, separators=(",", ":")) + except Exception: + return None + + +@functools.lru_cache +def _load_default_package_config_for_telemetry() -> Optional[dict[str, Any]]: + try: + default_config = load_config_file(OlivePackageConfig.get_default_config_path()) + except Exception: + return None + + return default_config if isinstance(default_config, dict) else None + + +def _normalize_package_config_snapshot(config_data: Any) -> Any: + if not isinstance(config_data, dict): + return config_data + + normalized = deepcopy(config_data) + passes = normalized.get("passes") + if isinstance(passes, dict): + normalized["passes"] = {str(pass_name).lower(): pass_config for pass_name, pass_config in passes.items()} + return normalized + + +def _extract_config_overrides(value: Any, baseline: Any = _NO_OVERRIDE) -> Any: + if baseline is _NO_OVERRIDE: + return deepcopy(value) + + if isinstance(value, dict) and isinstance(baseline, dict): + overrides = {} + for key, child_value in value.items(): + child_override = _extract_config_overrides(child_value, baseline.get(key, _NO_OVERRIDE)) + if child_override is not _NO_OVERRIDE: + overrides[key] = child_override + if overrides: + return overrides + return _NO_OVERRIDE if value == baseline else {} + + if isinstance(value, list): + if isinstance(baseline, list) and value == baseline: + return _NO_OVERRIDE + return deepcopy(value) + + if isinstance(value, tuple): + value_list = list(value) + baseline_list = list(baseline) if isinstance(baseline, tuple) else baseline + if isinstance(baseline_list, list) and value_list == baseline_list: + return _NO_OVERRIDE + return value_list + + return deepcopy(value) if value != baseline else _NO_OVERRIDE + + +def _load_config_input_for_telemetry(config_input: Any) -> Optional[Any]: + if config_input is None: + return None + if isinstance(config_input, dict): + return deepcopy(config_input) + if isinstance(config_input, (str, PathLike)): + return load_config_file(config_input) + + model_dump = getattr(config_input, "model_dump", None) + if callable(model_dump): + return model_dump(exclude_defaults=True, exclude_none=True, by_alias=True) + return None + + +def _sanitize_config_snapshot(value: Any, key: Optional[str] = None, model_type: Optional[str] = None) -> Any: + if key in HF_MODEL_IDENTIFIER_KEYS: + if str(model_type).lower() == "hfmodel": + hf_model_id = _extract_huggingface_model_id(value) + if hf_model_id: + return hf_model_id + return RECIPE_HASH_REDACTED_VALUE + if key in CONFIG_SNAPSHOT_REDACTED_KEYS or _is_path_like_key(key): + return RECIPE_HASH_REDACTED_VALUE + if key in CONFIG_REFERENCE_KEYS and isinstance(value, str): + return CONFIG_REFERENCE_REDACTED_VALUE + + if isinstance(value, dict): + child_model_type = _get_model_type(value) or model_type + if key == "systems": + return [_sanitize_config_snapshot(system, "system", child_model_type) for system in value.values()] + if key == "passes": + passes = [] + for pass_configs in value.values(): + if isinstance(pass_configs, list): + passes.extend(pass_configs) + else: + passes.append(pass_configs) + return [_sanitize_config_snapshot(pass_config, "pass", child_model_type) for pass_config in passes] + if key == "evaluators": + return [ + _sanitize_config_snapshot(evaluator, "evaluator_config", child_model_type) + for evaluator in value.values() + ] + return { + child_key: _sanitize_config_snapshot(child_value, child_key, child_model_type) + for child_key, child_value in value.items() + if child_value is not None + } + if isinstance(value, list): + return [_sanitize_config_snapshot(item, key, model_type) for item in value] + if isinstance(value, tuple): + return [_sanitize_config_snapshot(item, key, model_type) for item in value] + if isinstance(value, Path): + return RECIPE_HASH_REDACTED_VALUE + if callable(value): + return CONFIG_CALLABLE_REDACTED_VALUE + if isinstance(value, (str, int, float, bool)) or value is None: + return value + if hasattr(value, "value") and isinstance(value.value, (str, int, float, bool)): + return value.value + return f"<{type(value).__name__}>" + + +def _is_path_like_key(key: Optional[str]) -> bool: + if key is None: + return False + return key in {"path", "paths", "dir", "dirs", "file", "files"} or key.endswith( + ("_path", "_paths", "_dir", "_dirs", "_file", "_files") + ) + + +def _get_model_type(config: dict[str, Any]) -> Optional[str]: + model_type = config.get("type") + return str(model_type).lower() if model_type is not None else None + + +def _extract_huggingface_model_id(model_identifier: Any) -> Optional[str]: + if not isinstance(model_identifier, str): + return None + + identifier = model_identifier.strip() + if not identifier: + return None + + if identifier.startswith("https://huggingface.co/"): + parts = identifier.removeprefix("https://huggingface.co/").strip("/").split("/") + if len(parts) >= 2: + return f"{parts[0]}/{parts[1]}" + if parts and parts[0]: + return parts[0] + + if match := HF_CACHE_MODEL_PATTERN.search(identifier): + return f"{match.group(1)}/{match.group(2)}" + + if HF_REPO_ID_PATTERN.match(identifier) and not _has_local_model_file_suffix(identifier): + return identifier + + return None + + +def _extract_input_model_metadata(input_model_config: dict[str, Any]) -> dict[str, Optional[str]]: + model_config = input_model_config.get("config", {}) + model_attributes = model_config.get("model_attributes", {}) + model_task = model_attributes.get("hf_task") or model_config.get("task") + raw_identifier = model_attributes.get("_name_or_path") or model_config.get("model_path") + return { + "input_model_source": _classify_input_model_source(raw_identifier), + "model_task": str(model_task) if model_task is not None else None, + } + + +def _classify_input_model_source(model_identifier: Any) -> str: + if model_identifier is None: + return "unknown" + if isinstance(model_identifier, dict): + resource_type = model_identifier.get("type") + if resource_type == "azureml_registry_model": + return "azureml" + return "structured_resource" + + identifier = str(model_identifier) + if identifier.startswith("azureml://"): + return "azureml" + if identifier.startswith("https://huggingface.co/"): + return "huggingface_url" + if identifier.startswith(("http://", "https://")): + return "url" + + if _is_explicit_local_model_path(identifier): + suffix = PureWindowsPath(identifier).suffix or PurePosixPath(identifier).suffix + return "local_file" if suffix else "local_folder" + return "string_name" + + +def _is_explicit_local_model_path(identifier: str) -> bool: + if _has_local_model_file_suffix(identifier): + return True + return ( + identifier.startswith(("./", "../", ".\\", "..\\", "~/", "~\\", "/", "\\\\")) + or PureWindowsPath(identifier).is_absolute() + or PurePosixPath(identifier).is_absolute() + ) + + +def _has_local_model_file_suffix(identifier: str) -> bool: + suffix = PureWindowsPath(identifier).suffix or PurePosixPath(identifier).suffix + return suffix.lower() in LOCAL_MODEL_FILE_SUFFIXES + + +def _extract_target_metadata(run_config: "RunConfig") -> dict[str, Optional[str]]: + target_system = run_config.engine.target + return _extract_system_metadata(target_system, "target") + + +def _extract_host_metadata(run_config: "RunConfig") -> dict[str, Optional[str]]: + host_system = run_config.engine.host + if host_system is None: + return { + "host_system_type": SystemType.Local.value, + } + return _extract_system_metadata(host_system, "host") + + +def _extract_system_metadata(system_config: Optional[Any], field_prefix: str) -> dict[str, Optional[str]]: + system_type = system_config.type.value if system_config is not None else None + device = None + execution_provider = None + execution_providers = None + + accelerators = system_config.config.accelerators if system_config and system_config.config else None + if accelerators: + accelerator = accelerators[0] + device = str(accelerator.device) if accelerator.device is not None else None + ep_values = accelerator.get_ep_strs() or [] + if ep_values: + execution_provider = ep_values[0] + execution_providers = ";".join(ep_values) + + return { + f"{field_prefix}_system_type": system_type, + f"{field_prefix}_device": device, + f"{field_prefix}_execution_provider": execution_provider, + f"{field_prefix}_execution_providers": execution_providers, + } + + +def _set_metadata_if_present(metadata: dict[str, Any], values: dict[str, Optional[str]]) -> None: + for key, value in values.items(): + if value is not None: + metadata.setdefault(key, value) + + +def _get_used_pass_types(run_config: "RunConfig") -> list[str]: + return ( + [pass_config.type for _, pass_configs in run_config.passes.items() for pass_config in pass_configs] + if run_config.passes + else [] + ) + + +def _build_recipe_hash(run_config_json: dict[str, Any]) -> str: + sanitized = deepcopy(run_config_json) + _redact_recipe_hash_keys(sanitized) + return hash_dict(sanitized)[:16] + + +def _redact_recipe_hash_keys(value: Any, key: Optional[str] = None) -> Any: + if key in RECIPE_HASH_REDACTED_KEYS or _is_path_like_key(key): + return RECIPE_HASH_REDACTED_VALUE + if isinstance(value, dict): + for child_key in list(value): + value[child_key] = _redact_recipe_hash_keys(value[child_key], child_key) + elif isinstance(value, list): + for index, item in enumerate(value): + value[index] = _redact_recipe_hash_keys(item, key) + elif isinstance(value, tuple): + return [_redact_recipe_hash_keys(item, key) for item in value] + elif isinstance(value, Path): + return RECIPE_HASH_REDACTED_VALUE + elif callable(value): + return CONFIG_CALLABLE_REDACTED_VALUE + elif hasattr(value, "value") and isinstance(value.value, (str, int, float, bool)): + return value.value + return value diff --git a/olive/telemetry/telemetry.py b/olive/telemetry/telemetry.py index 0ddb690e2a..52d9e548ca 100644 --- a/olive/telemetry/telemetry.py +++ b/olive/telemetry/telemetry.py @@ -2,34 +2,43 @@ # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. # -------------------------------------------------------------------------- -"""Thin wrapper around the OneCollector telemetry logger with event helpers.""" +"""Telemetry singleton backed by a durable SQLite event queue. + +Events are serialized to Common Schema JSON and written to a per-app SQLite +store; a background uploader drains the store to Microsoft OneCollector. Because +every event is persisted before any network call, the process can exit at any +time without losing data and without an exit-time flush. The pipeline uses only +the Python standard library (no OpenTelemetry, no requests). +""" import base64 -import errno -import json import os import platform import threading -import time -from pathlib import Path -from typing import TYPE_CHECKING, Any, Optional +import uuid +from datetime import datetime, timezone +from typing import Any, Optional from olive.telemetry.constants import CONNECTION_STRING from olive.telemetry.deviceid import get_encrypted_device_id_and_status from olive.telemetry.library.event_source import event_source -from olive.telemetry.library.telemetry_logger import TelemetryLogger, get_telemetry_logger -from olive.telemetry.utils import ( - _decode_cache_line, - _encode_cache_line, - _exclusive_file_lock, - get_telemetry_base_dir, -) +from olive.telemetry.library.options import OneCollectorExporterOptions +from olive.telemetry.library.serialization import CommonSchemaJsonSerializationHelper +from olive.telemetry.offline_store import OfflineEventStore +from olive.telemetry.uploader import EventUploader +from olive.telemetry.utils import get_telemetry_base_dir -if TYPE_CHECKING: - from olive.telemetry.library.callback_manager import PayloadTransmittedCallbackArgs +try: + from olive.version import __version__ as VERSION +except Exception: + VERSION = "unknown" # Default event names used by the high-level telemetry helpers. HEARTBEAT_EVENT_NAME = "OliveHeartbeat" +RECIPE_EVENT_NAME = "OliveRecipe" +ACTION_EVENT_NAME = "OliveAction" +ERROR_EVENT_NAME = "OliveError" +APP_NAME = "Olive" # CI/CD environment variables whose presence indicates an automated pipeline. _CI_ENV_VARS = ( @@ -41,17 +50,15 @@ "BUILDKITE", # Buildkite "SYSTEM_TEAMFOUNDATIONCOLLECTIONURI", # Azure DevOps ) -ACTION_EVENT_NAME = "OliveAction" -ERROR_EVENT_NAME = "OliveError" ALLOWED_KEYS = { HEARTBEAT_EVENT_NAME: { "device_id", - "id_status", - "os.name", - "os.version", - "os.release", - "os.arch", + "device_id_status", + "os", + "os_version", + "os_release", + "os_arch", "app_version", "app_instance_id", "initTs", @@ -72,386 +79,67 @@ "app_instance_id", "initTs", }, + RECIPE_EVENT_NAME: { + "recipe_name", + "recipe_hash", + "recipe_source", + "recipe_format", + "recipe_command", + "execution_mode", + "workflow_id", + "config_overrides", + "success", + "input_model_type", + "input_model_source", + "model_task", + "target_system_type", + "target_device", + "target_execution_provider", + "target_execution_providers", + "host_system_type", + "host_device", + "host_execution_provider", + "host_execution_providers", + "pass_types", + "pass_count", + "data_config_count", + "search_enabled", + "package_config_provided", + "package_config_overrides", + "is_ci", + "app_version", + "app_instance_id", + "initTs", + }, } CRITICAL_EVENTS = {HEARTBEAT_EVENT_NAME} -MAX_CACHE_SIZE_BYTES = 5 * 1024 * 1024 -HARD_MAX_CACHE_SIZE_BYTES = 10 * 1024 * 1024 -CACHE_FILE_NAME = "olive.json" - - -class TelemetryCacheHandler: - """Handles caching of failed telemetry events for offline resilience. - - Design decisions: - - Single shared cache file (olive.json) for simplicity - - Cache writes are synchronous (fast JSON operations don't need async) - - Cache flush runs in a separate thread (slow network I/O) - - Flush triggered on success when cached events exist - - All critical sections protected by lock to prevent race conditions - - Newline-delimited JSON format for human readability and partial corruption recovery - - Assumptions: - - File I/O (JSON lines) is fast enough for synchronous execution (~microseconds) - - Network I/O is slow and should not block the callback thread - - Successful send indicates network is available to retry cached events - - Cache persists across sessions for offline resilience - """ - - def __init__(self, telemetry: "Telemetry") -> None: - self._telemetry = telemetry - # Single shared cache file for all processes - self._cache_file_name = CACHE_FILE_NAME - self._shutdown = False - # Protects all shared state to prevent race conditions - self._lock = threading.Lock() - self._callback_condition = threading.Condition() - self._callbacks_item_count = 0 - self._events_logged = 0 - # Prevents concurrent flush operations - self._is_flushing = False - - def shutdown(self) -> None: - """Signal shutdown to prevent new operations. - - Note: Does NOT flush the cache. Cache persists across sessions for - offline resilience. If network is working, success callbacks already - flushed. If network is down, flushing would fail anyway. - """ - with self._lock: - self._shutdown = True - - def __del__(self): - """Cleanup cache handler resources on garbage collection. - - Safety net to ensure shutdown is called even if not done explicitly. - """ - try: - self.shutdown() - except Exception: - # Silently ignore errors during cleanup - pass - - def on_payload_transmitted(self, args: "PayloadTransmittedCallbackArgs") -> None: - """Telemetry payload transmission callback. - - Design decisions: - - Ignore callbacks during flush (unlikely to fail during successful flush) - - On success: flush cache if any cached events exist - - On failure: write to cache immediately (synchronous for simplicity) - - Assumptions: - - Successful transmission indicates network is available to retry cached events - - If flush is in progress, we already successfully sent an event, so unlikely an event would suddenly fail - - Multiple concurrent successes don't need multiple flush operations - - Failed payloads should be cached immediately to avoid loss - """ - try: - payload = None - should_flush = False - - with self._lock: - if self._shutdown: - return - - # Skip callbacks from replayed events during flush - # If a flush is in progress it means we successfully sent an event, - # so it's unlikely that an event would suddenly fail and need to be cached - # and we don't need to flush again. - if self._is_flushing: - with self._callback_condition: - self._callbacks_item_count += args.item_count - self._callback_condition.notify_all() - return - - if args.succeeded: - # Only flush if cache exists and no flush is in progress - cache_path = self.cache_path - if cache_path and cache_path.exists(): - should_flush = True - else: - payload = args.payload_bytes - - if should_flush: - # Release lock before scheduling (flush runs in separate thread) - self._schedule_flush() - elif payload: - # Write synchronously - JSON operations are fast enough - self._write_payload_to_cache(payload) - except Exception: - # Fail silently - telemetry should never crash the application - pass - finally: - with self._callback_condition: - self._callbacks_item_count += args.item_count - self._callback_condition.notify_all() - - def wait_for_callbacks(self, timeout_sec: float, during_flush: bool = False) -> bool: - deadline = time.time() + timeout_sec - while True: - with self._callback_condition: - callbacks_item_count = self._callbacks_item_count - expected_items = self._events_logged - if (during_flush or not self.is_flushing) and callbacks_item_count >= expected_items: - return True - remaining = deadline - time.time() - if remaining <= 0: - return False - with self._callback_condition: - self._callback_condition.wait(timeout=remaining) - - def record_event_logged(self, count: int = 1) -> None: - with self._callback_condition: - self._events_logged += count - - def _schedule_flush(self) -> None: - """Schedule cache flush in a separate thread to avoid blocking the callback. - - Design decisions: - - Check _is_flushing before spawning thread to avoid unnecessary threads - - Run flush in daemon thread (don't block process exit) - - Acquire lock at start to set _is_flushing flag atomically - - Always clear _is_flushing flag even if flush fails - - Assumptions: - - Flush operations are slow (network I/O) and should not block callbacks - - Daemon thread is acceptable (flush is best-effort) - """ - # Check before spawning thread to avoid unnecessary thread creation - with self._lock: - if self._shutdown or self._is_flushing: - return - self._is_flushing = True - - def flush_task(): - try: - self._flush_cache() - except Exception: - # Fail silently - pass - finally: - # Always clear flag, even on exception - with self._lock: - self._is_flushing = False - - thread = threading.Thread(target=flush_task, daemon=True) - thread.start() - - @property - def cache_path(self) -> Optional[Path]: - """Get the path to the telemetry cache file. - - Returns: - Optional[Path]: Path to cache file, or None if base directory unavailable. - - """ - telemetry_cache_dir = None - if "OLIVE_TELEMETRY_CACHE_DIR" in os.environ: - telemetry_cache_dir = os.environ["OLIVE_TELEMETRY_CACHE_DIR"] - if not telemetry_cache_dir: - telemetry_cache_dir = get_telemetry_base_dir() / "cache" - return telemetry_cache_dir / self._cache_file_name - - def _write_payload_to_cache(self, payload: bytes) -> None: - """Write failed telemetry payload to cache for later retry. - - Design decisions: - - Parse payload to extract individual events (allows filtering) - - Filter to only critical events near size limit (preserves important data) - - Use file locking for multi-process safety (prevents corruption) - - Use exponential backoff for file contention (avoids spinning) - - Fail silently on errors (telemetry should never crash app) - - Assumptions: - - JSON operations are fast enough for synchronous execution - - File contention is rare and transient (retry a few times) - - Cache size limits prevent unbounded growth - - Critical events (heartbeat) are more important than others - """ - try: - cache_path = self.cache_path - if cache_path is None: - return - - # Parse payload into individual events for filtering - entries = _parse_payload(payload) - if not entries: - return - - cache_path.parent.mkdir(parents=True, exist_ok=True) - - max_retries = 3 - for attempt in range(max_retries + 1): - try: - cache_size = cache_path.stat().st_size if cache_path.exists() else 0 - - # Hard limit: stop caching entirely to prevent unbounded growth - if cache_size >= HARD_MAX_CACHE_SIZE_BYTES: - return - - # Soft limit: keep only critical events to preserve space - if cache_size >= MAX_CACHE_SIZE_BYTES: - entries = [entry for entry in entries if entry["event_name"] in CRITICAL_EVENTS] - if not entries: - return - - # Append base64-encoded newline-delimited entries - # Use exclusive file lock for multi-process safety - with _exclusive_file_lock(cache_path, mode="a") as cache_file: - for entry in entries: - plain = json.dumps(entry, ensure_ascii=False, separators=(",", ":")) - cache_file.write(_encode_cache_line(plain) + "\n") - return - except OSError as exc: - # Retry only on transient access errors (file locked by another process) - if exc.errno not in {errno.EACCES, errno.EAGAIN, errno.EWOULDBLOCK, errno.EBUSY}: - return - if attempt >= max_retries: - return - # Exponential backoff: 50ms, 100ms, 200ms (aligned with C# implementation) - time.sleep(0.05 * (2**attempt)) - except Exception: - # Fail silently - telemetry errors should not crash the application - return - - def _flush_cache(self) -> None: - """Flush this process's cached events back to telemetry service.""" - cache_path = self.cache_path - if cache_path is None or not cache_path.exists(): - return - - self._flush_cache_file(cache_path) - - def _flush_cache_file(self, cache_path: Path) -> None: - """Flush cached events back to telemetry service. - - Approach: - 1. Atomically rename cache → .flush (claims ownership, prevents concurrent flushes) - 2. Read all events from .flush file - 3. Queue all events for sending via telemetry logger - 4. Force flush with 2-second timeout - 5. On success: delete .flush file - 6. On failure: restore .flush → cache for retry - - Multi-process coordination: - - `replace()` is atomic; only one process can successfully rename the cache file - - If another process already renamed it, we get FileNotFoundError and abort - - Stale .flush files from crashes are overwritten by the atomic rename - - Shutdown handling: - - If shutdown flag set during flush, restore cache before returning - - This preserves events even if callbacks don't fire during shutdown - - Callback behavior: - - Queued events trigger callbacks with success/failure - - Failed events are automatically re-cached via callbacks (unless shutting down) - - The _is_flushing flag prevents re-caching of replayed events during flush - """ - flush_path = None - try: - # Check shutdown before starting (under lock to prevent race) - with self._lock: - if self._shutdown: - return - - if not cache_path.exists(): - return - - # Atomically rename to .flush file to claim ownership - # Overwrite any stale .flush file from crashed process (C# pattern) - flush_path = cache_path.with_name(f"{cache_path.name}.flush") - try: - # On Windows/POSIX, replace() overwrites existing files atomically - cache_path.replace(flush_path) - except FileNotFoundError: - # Cache already claimed by another flush or doesn't exist - return - # Read all cached entries (base64-decoded) - entries = _read_cache_entries(flush_path) +# Per-app database file. Olive and other apps use separate files so a process +# never drains another app's events (which carry a different tenant key). +DB_FILE_NAME = "olive_telemetry.db" - if not entries: - # Empty cache, just delete the flush file - flush_path.unlink(missing_ok=True) - return - # Replay all events through telemetry logger - # Note: _is_flushing flag (set by caller) prevents these callbacks from re-caching or triggering nested flushes - # (unlikely since we just successfully sent an event, indicating network is available) - for entry in entries: - try: - event_name = entry["event_name"] - event_data = entry["event_data"] - if not event_name or not event_data: - continue - attributes = json.loads(event_data) - if not isinstance(attributes, dict): - continue - # Preserve original timestamp - attributes["initTs"] = entry.get("initTs", entry["ts"]) - self._telemetry.log(event_name, attributes, None) - except Exception: - # Skip malformed entries - continue - - # Check if shutdown happened during flush - with self._lock: - if self._shutdown: - # Restore cache to avoid data loss during shutdown - if flush_path and flush_path.exists(): - try: - cache_path.parent.mkdir(parents=True, exist_ok=True) - flush_path.replace(cache_path) - except Exception: - # Silently ignore errors during cleanup - pass - return - - # Wait for in-flight callbacks to complete before deciding success/failure - flush_success = self.wait_for_callbacks(timeout_sec=5.0, during_flush=True) - if flush_success: - # Success: delete the flush file (events were sent) - if flush_path: - flush_path.unlink(missing_ok=True) - elif flush_path and flush_path.exists(): - # Failure: restore cache for retry later - cache_path.parent.mkdir(parents=True, exist_ok=True) - flush_path.replace(cache_path) - except Exception: - # Best-effort restore on any exception to prevent data loss - if flush_path and flush_path.exists(): - try: - cache_path.parent.mkdir(parents=True, exist_ok=True) - flush_path.replace(cache_path) - except Exception: - # If restore fails, we lose the data (acceptable for telemetry) - pass - return - - @property - def is_flushing(self) -> bool: - with self._lock: - return self._is_flushing +def is_ci_environment() -> bool: + """Detect CI/CD environments by checking well-known environment variables.""" + return any(os.environ.get(var) for var in _CI_ENV_VARS) class Telemetry: - """Wrapper that wires environment configuration into the library logger. + """Per-process singleton that persists events to SQLite and uploads them. - This is a singleton class - all instances share the same state. + Separate processes get separate in-memory singletons and coordinate only + through the shared SQLite store and its single-drainer file lock. Use Telemetry() to get the singleton instance. """ _instance: Optional["Telemetry"] = None - _lock = threading.Lock() + _lock = threading.RLock() def __new__(cls): - """Create or return the singleton instance. - - Thread-safe singleton implementation using double-checked locking. - """ + """Create or return the singleton instance.""" if cls._instance is None: with cls._lock: - # Double-check pattern to prevent race conditions if cls._instance is None: instance = super().__new__(cls) instance._initialized = False @@ -459,71 +147,85 @@ def __new__(cls): return cls._instance def __init__(self): - """Initialize the telemetry logger (only runs once for singleton).""" - # Prevent re-initialization - if self._initialized: - return - - self._logger = None - self._cache_handler = None - - try: - self._logger = self._create_logger() - event_source.disable() - - self._cache_handler = TelemetryCacheHandler(self) - self._setup_payload_callbacks() - if self._is_ci_environment(): - self.disable_telemetry() - self._initialized = True + """Initialize the telemetry store and uploader (runs once).""" + with self._lock: + if self._initialized: return - self._log_heartbeat() - if os.environ.get("OLIVE_DISABLE_TELEMETRY") == "1": - self.disable_telemetry() - self._initialized = True - except Exception: - # Fail silently — telemetry must never crash the host application + # Mark initialized under the lock before doing any work, so two + # threads whose first Telemetry() calls interleave cannot both run + # the body (which would create two uploaders and two heartbeats). self._initialized = True - @staticmethod - def _is_ci_environment() -> bool: - """Detect CI/CD environments by checking well-known environment variables.""" - return any(os.environ.get(var) for var in _CI_ENV_VARS) + self._store: Optional[OfflineEventStore] = None + self._uploader: Optional[EventUploader] = None + self._enabled = True + self._recipe_only_ci_telemetry = False + self._global_metadata: dict[str, Any] = {} + self._instrumentation_key = "" + self._envelope_ikey = "" + self._app_instance_id = uuid.uuid4().hex + self._heartbeat_thread: Optional[threading.Thread] = None - def _create_logger(self) -> Optional[TelemetryLogger]: - try: - return get_telemetry_logger(base64.b64decode(CONNECTION_STRING).decode()) - except Exception: - return None + try: + # User opt-out (OLIVE_DISABLE_TELEMETRY=1): detailed events are + # not recorded, but the device-id heartbeat is still written + # (durably) so device counting keeps working. CI is handled via + # recipe-only mode below and never sends a heartbeat. + user_opt_out = os.environ.get("OLIVE_DISABLE_TELEMETRY") == "1" + + options = OneCollectorExporterOptions(connection_string=base64.b64decode(CONNECTION_STRING).decode()) + options.validate() + self._instrumentation_key = options.instrumentation_key + self._envelope_ikey = ( + f"{CommonSchemaJsonSerializationHelper.ONE_COLLECTOR_TENANCY_SYMBOL}:{options.tenant_token}" + ) - def _setup_payload_callbacks(self) -> None: - # Register callback for payload transmission events - # No need to store unregister function - logger shutdown will clean up callbacks - if self._logger is None: - return - self._logger.register_payload_transmitted_callback( - self._cache_handler.on_payload_transmitted, - include_failures=True, - ) + event_source.disable() - def add_global_metadata(self, metadata: dict[str, Any]) -> None: - """Add metadata to all telemetry events. + # In CI, only recipe events are sent (no heartbeat, no + # action/error); this is independent of user opt-out. + self._recipe_only_ci_telemetry = is_ci_environment() - Args: - metadata: Dictionary of metadata key-value pairs to add to all events. - These will be included in every telemetry event sent. + # Opt-out + CI: record and send nothing at all. + if user_opt_out and self._recipe_only_ci_telemetry: + self._enabled = False + return - Example: - >>> telemetry = Telemetry() - >>> telemetry.add_global_metadata({"user_id": "12345", "environment": "production"}) + # Detailed events are recorded only when enabled; the heartbeat + # ignores this gate. + self._enabled = not user_opt_out + + # Durable on-disk queue + background uploader. The uploader + # retries until delivery, which makes the device-id heartbeat + # reliable even on opt-out. + db_path = os.path.join(get_telemetry_base_dir(), DB_FILE_NAME) + self._store = OfflineEventStore(db_path) + self._uploader = EventUploader(self._store, instrumentation_key=self._instrumentation_key) + self._uploader.start() + + # The device-id heartbeat is written to the durable store, not + # sent directly. It is suppressed in CI (recipe-only mode). + if not self._recipe_only_ci_telemetry: + self._start_heartbeat() + except Exception: + # Fail silently — telemetry must never crash the host application + self._store = None + self._uploader = None + self._enabled = False + + def _start_heartbeat(self) -> None: + """Send the device-id heartbeat on a background daemon thread.""" + self._heartbeat_thread = threading.Thread( + target=self._send_heartbeat, name="olive-telemetry-heartbeat", daemon=True + ) + self._heartbeat_thread.start() - """ + def add_global_metadata(self, metadata: dict[str, Any]) -> None: + """Merge metadata into every subsequent telemetry event.""" try: - if self._logger is None: - return - self._logger.add_global_metadata(metadata) + if metadata: + self._global_metadata.update(metadata) except Exception: - # Fail silently — telemetry must never crash the host application pass def log( @@ -532,110 +234,117 @@ def log( attributes: Optional[dict[str, Any]] = None, metadata: Optional[dict[str, Any]] = None, ) -> None: - """Log a telemetry event. - - Args: - event_name: Name of the event to log (e.g., "UserLogin", "ModelTrained"). - attributes: Optional dictionary of event-specific attributes. - metadata: Optional dictionary of additional metadata to merge with attributes. - - Example: - >>> telemetry = Telemetry() - >>> telemetry.log("ModelOptimized", {"model_type": "bert", "duration_ms": 1500}) - - """ + """Log a telemetry event (persisted durably, uploaded in the background).""" try: - attrs = _merge_metadata(attributes, metadata) - if self._logger is None: + if not self._enabled or self._store is None: + return + if self._recipe_only_ci_telemetry and event_name != RECIPE_EVENT_NAME: return - self._logger.log(event_name, attrs) - if self._cache_handler: - self._cache_handler.record_event_logged() + payload = self._build_payload(event_name, attributes, metadata) + if payload is None: + return + self._store.store(payload) + if self._uploader is not None: + self._uploader.request_drain() except Exception: # Fail silently — telemetry must never crash the host application pass - def _log_heartbeat( + def _build_payload( self, + event_name: str, + attributes: Optional[dict[str, Any]], metadata: Optional[dict[str, Any]] = None, - ) -> None: - """Log a heartbeat event with system information. + ) -> Optional[bytes]: + """Merge metadata, filter to whitelisted keys, and serialize one event. + + Returns the Common Schema JSON bytes, or None if the event is not + whitelisted or filters to nothing. + """ + attrs = _merge_metadata(attributes, metadata) + if self._global_metadata: + attrs = {**self._global_metadata, **attrs} + filtered = _filter_event_data(event_name, attrs) + if not filtered: + # Unknown/empty event: not whitelisted. + return None + filtered.setdefault("app_version", VERSION) + filtered.setdefault("app_instance_id", self._app_instance_id) + envelope = CommonSchemaJsonSerializationHelper.create_event_envelope( + event_name=event_name, + timestamp=datetime.now(timezone.utc), + ikey=self._envelope_ikey, + data=filtered, + ) + return CommonSchemaJsonSerializationHelper.serialize_to_json_bytes(envelope) - Args: - metadata: Optional additional metadata to include. + def _send_heartbeat(self, metadata: Optional[dict[str, Any]] = None) -> None: + """Enqueue the device-id heartbeat in the durable store. + Runs on a background thread on every non-CI run (including user opt-out) + so device counting works and is retried until delivered. The heartbeat + deliberately ignores the ``_enabled`` gate that suppresses detailed + events on opt-out; only detailed events are withheld from opted-out + users. """ + if self._store is None: + return try: encrypted_device_id, device_id_status = get_encrypted_device_id_and_status() attributes = { "device_id": encrypted_device_id, - "id_status": device_id_status.value, - "os": { - "name": platform.system().lower(), - "version": platform.version(), - "release": platform.release(), - "arch": platform.machine(), - }, + "device_id_status": device_id_status.value, + "os": platform.system(), + "os_version": platform.version(), + "os_release": platform.release(), + "os_arch": platform.machine(), } - self.log(HEARTBEAT_EVENT_NAME, attributes, metadata) + payload = self._build_payload(HEARTBEAT_EVENT_NAME, attributes, metadata) + if payload is None: + return + self._store.store(payload) + if self._uploader is not None: + self._uploader.request_drain() except Exception: - # Fail silently — telemetry must never crash the host application pass def disable_telemetry(self) -> None: - """Disable all telemetry logging. - - After calling this method, no telemetry events will be sent until - telemetry is explicitly re-enabled. - """ + """Disable telemetry and stop the background uploader (non-blocking).""" try: - if self._logger is None: - return - self._logger.disable_telemetry() + self._enabled = False + if self._uploader is not None: + # Non-blocking: signal the daemon thread to wind down without + # joining, so opting out never blocks the caller. + self._uploader.signal_stop() + self._uploader = None except Exception: - # Fail silently — telemetry must never crash the host application pass def shutdown(self, timeout_millis: float = 10_000, callback_timeout_millis: float = 2_000) -> None: - """Shutdown telemetry and flush pending events. - - Shutdown sequence: - 1. Wait for in-flight flush to complete (up to 1 second) - 2. Wait for callbacks + signal shutdown to cache handler - 3. Shutdown logger (cleans up callbacks automatically) + """Stop the background uploader without blocking process exit. + + Delivery does not depend on a flush here: durability guarantees that any + undelivered events remain in the on-disk store and are uploaded on the + next run (or by a concurrently-running process). We deliberately do NOT + perform synchronous network I/O at shutdown, because Olive's CLI calls + this on every exit and a blocked/unreachable collector would otherwise + stall exit for the full send timeout. """ try: - # Step 1: Wait for pending flush to complete (matches C# 1-second timeout) - start_time = time.time() - while time.time() - start_time < 1.0: - if not self._cache_handler or not self._cache_handler.is_flushing: - break - time.sleep(0.05) - - # Step 2: Wait for callbacks/flush to complete before shutting down cache handler - if self._cache_handler: - # Nothing can be done if callbacks don't complete in time, so we ignore the result - _ = self._cache_handler.wait_for_callbacks(callback_timeout_millis / 1000) - self._cache_handler.shutdown() - - # Step 3: Shutdown logger (callbacks cleaned up automatically) - if self._logger is not None: - self._logger.shutdown() + if self._uploader is not None: + self._uploader.signal_stop() + self._uploader = None + if self._store is not None: + self._store.close() except Exception: # Fail silently — telemetry must never crash the host application pass def __del__(self): - """Cleanup telemetry resources on garbage collection. - - This is a safety net to ensure resources are cleaned up even if - shutdown() is not explicitly called. However, relying on __del__ - is not recommended - always call shutdown() explicitly when done. - """ + """Safety-net cleanup on garbage collection.""" try: self.shutdown() except Exception: - # Silently ignore errors during cleanup pass @@ -651,66 +360,12 @@ def _merge_metadata(attributes: Optional[dict[str, Any]], metadata: Optional[dic return merged -def _parse_payload(payload: bytes) -> list[dict[str, Any]]: - """Parse telemetry payload into individual event entries. - - Design decisions: - - Filter events to only allowed keys (privacy/security) - - Store as minimal JSON (reduces cache size) - - Fail silently on malformed data (telemetry should be robust) - - Assumptions: - - Payload is newline-delimited JSON (OneCollector format) - - Events have "name", "time", and "data" fields - - Only whitelisted events and fields should be cached - """ - entries = [] - try: - payload_text = payload.decode("utf-8") - lines = payload_text.splitlines() - - for raw_line in lines: - line = raw_line.strip() - if not line: - continue - try: - event = json.loads(line) - event_name = event["name"] - if not event_name: - continue - # Filter to only allowed keys for privacy/security - filtered_data = _filter_event_data(event_name, event["data"]) - if not filtered_data: - continue - entries.append( - { - "ts": event["time"] or time.time(), - "event_name": event_name, - # Compact JSON to reduce cache size - "event_data": json.dumps(filtered_data, ensure_ascii=False, separators=(",", ":")), - } - ) - except Exception: - # Skip malformed lines - continue - except Exception: - # If entire payload is malformed, return empty list - return [] - - return entries - - def _filter_event_data(event_name: str, data: dict[str, Any]) -> Optional[dict[str, Any]]: """Filter event data to only allowed keys for privacy/security. - Design decisions: - - Whitelist approach (only explicitly allowed keys are included) - - Support nested keys with dot notation (e.g., "os.name") - - Return None if no allowed keys found (filters out unknown events) - - Assumptions: - - ALLOWED_KEYS dict defines all cacheable events and their fields - - Unknown events should not be cached (privacy/security) + Whitelist approach: only explicitly allowed keys (with dot-notation support + for nested values, e.g. "os.name") are kept. Returns None for unknown events + so they are neither persisted nor sent. """ if event_name not in ALLOWED_KEYS: return None @@ -740,37 +395,3 @@ def _set_nested_value(data: dict[str, Any], key: str, value: Any) -> None: for part in parts[:-1]: current = current.setdefault(part, {}) current[parts[-1]] = value - - -def _read_cache_entries(cache_path: Path) -> list[dict[str, Any]]: - """Read all entries from a cache file, decoding each line. - - Design decisions: - - Use file locking for multi-process safety - - Continue reading past malformed entries (partial data recovery) - - Return empty list on complete read failure (fail gracefully) - - Each line is base64-decoded before JSON parsing. - - Assumptions: - - Cache file contains newline-delimited base64-encoded entries (one per line) - - Each line is independent (one malformed line doesn't affect others) - - Empty or whitespace-only lines are skipped - """ - entries = [] - try: - with _exclusive_file_lock(cache_path, mode="r") as cache_file: - for raw_line in cache_file: - line = raw_line.strip() - if not line: - continue - try: - line = json.loads(_decode_cache_line(line)) - if isinstance(line, dict): - entries.append(line) - except Exception: - # Malformed line, skip and continue - continue - except Exception: - # If file cannot be opened or read, return empty list - return [] - return entries diff --git a/olive/telemetry/telemetry_extensions.py b/olive/telemetry/telemetry_extensions.py index e5b13395d0..7a1ad16233 100644 --- a/olive/telemetry/telemetry_extensions.py +++ b/olive/telemetry/telemetry_extensions.py @@ -6,11 +6,11 @@ import functools import inspect import time +import traceback from types import TracebackType from typing import Any, Callable, Optional, TypeVar -from olive.telemetry.telemetry import ACTION_EVENT_NAME, ERROR_EVENT_NAME, _get_logger -from olive.telemetry.utils import _format_exception_message +from olive.telemetry.telemetry import ACTION_EVENT_NAME, ERROR_EVENT_NAME, RECIPE_EVENT_NAME, _get_logger _TFunc = TypeVar("_TFunc", bound=Callable[..., Any]) @@ -45,6 +45,76 @@ def log_error( telemetry.log(ERROR_EVENT_NAME, attributes, metadata) +def log_recipe_result( + recipe_name: str, + success: bool, + metadata: Optional[dict[str, Any]] = None, +) -> None: + telemetry = _get_logger() + attributes = { + "recipe_name": recipe_name, + "success": success, + } + telemetry.log(RECIPE_EVENT_NAME, attributes, metadata) + + +def _redact_paths(text: str) -> str: + """Replace absolute filesystem paths with a non-identifying token. + + Keeps a trailing filename (one containing an extension) because it is useful + for debugging and is not personal data; drops everything else, including + paths whose last segment is itself a directory or username (e.g. /home/alice + or a UNC share root), which a bare-basename redaction would expose. + """ + import re + + # Windows drive paths (C:\Users\me\x), UNC paths (\\server\share\me\x), and + # POSIX absolute paths (/home/me/x). + pattern = re.compile( + r"(?:[A-Za-z]:\\[^\s\"']+)" + r"|(?:\\\\[^\s\"']+)" + r"|(?:/[^\s\"':]+(?:/[^\s\"':]+)+)" + ) + + def _redact(match: "re.Match") -> str: + base = match.group(0).replace("\\", "/").rstrip("/").rsplit("/", 1)[-1] + if base in (".", "..") or "." not in base: + return "" + return base + + return pattern.sub(_redact, text) + + +def _format_exception_message(ex: BaseException, tb: Optional[TracebackType] = None) -> str: + """Format an exception and strip local paths for privacy. + + Each entry from ``traceback.format_exception`` is a multi-line string (the + ``File "..."`` line plus the offending source line), so we process every + physical line: filenames are trimmed to a package-relative form, and any + absolute path that remains on a source or message line is redacted so a + username embedded in it cannot leak into OliveError. + """ + folder = "Olive" + file_line = 'File "' + formatted = traceback.format_exception(type(ex), ex, tb, limit=5) + lines = [] + for chunk in formatted: + for raw_line in chunk.splitlines(): + line_trunc = raw_line.strip() + if line_trunc.startswith(file_line) and folder in line_trunc: + idx = line_trunc.find(folder) + if idx != -1: + line_trunc = line_trunc[idx + len(folder) :] + elif line_trunc.startswith(file_line): + idx = line_trunc[len(file_line) :].find('"') + line_trunc = line_trunc[idx + len(file_line) :] + # Redact any absolute path that remains (source lines, message, and + # the tail of File lines). + line_trunc = _redact_paths(line_trunc) + lines.append(line_trunc) + return "\n".join(lines) + + def _resolve_invoked_from(skip_frames: int = 0) -> str: """Resolve how Olive was invoked by examining the call stack. @@ -123,13 +193,19 @@ def action(func: _TFunc) -> _TFunc: @functools.wraps(func) def wrapper(*args: Any, **kwargs: Any): - invoked_from = _resolve_invoked_from() - action_name = func.__name__ - if args and hasattr(args[0], "__class__"): - cls_name = args[0].__class__.__name__ - cls_name = cls_name[: -len("Command")] if cls_name.endswith("Command") else cls_name - if cls_name: - action_name = cls_name if action_name == "run" else f"{cls_name}.{action_name}" + # Resolve telemetry context defensively: instrumentation (including + # inspect.stack()) must never propagate into the wrapped call. + try: + invoked_from = _resolve_invoked_from() + action_name = func.__name__ + if args and hasattr(args[0], "__class__"): + cls_name = args[0].__class__.__name__ + cls_name = cls_name[: -len("Command")] if cls_name.endswith("Command") else cls_name + if cls_name: + action_name = cls_name if action_name == "run" else f"{cls_name}.{action_name}" + except Exception: + invoked_from = "unknown" + action_name = getattr(func, "__name__", "unknown") start_time = time.perf_counter() success = True diff --git a/olive/telemetry/uploader.py b/olive/telemetry/uploader.py new file mode 100644 index 0000000000..43d5f21223 --- /dev/null +++ b/olive/telemetry/uploader.py @@ -0,0 +1,194 @@ +# ------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. +# -------------------------------------------------------------------------- + +"""Background uploader that drains the SQLite offline store to OneCollector. + +Reads the oldest batch of events, POSTs them, and: +- deletes them on success (HTTP 2xx), +- deletes them on a permanent, non-retryable failure (e.g. HTTP 4xx) so a + poison event cannot block the queue forever, +- leaves them on a transient failure (network error / HTTP 5xx / timeout) to be + retried on the next cycle or the next process run. + +Durability is provided by the on-disk store, so the process can exit at any time +without losing events and without an exit-time flush. +""" + +import threading +import time +from typing import Optional + +from olive.telemetry.library.options import CompressionType, OneCollectorTransportOptions +from olive.telemetry.library.payload_builder import PayloadBuilder +from olive.telemetry.library.transport import HttpJsonPostTransport +from olive.telemetry.offline_store import OfflineEventStore +from olive.telemetry.process_lock import ProcessDrainLock + + +class EventUploader: + """Drains the offline store and ships events over HTTP on a daemon thread. + + When several processes share one telemetry database, a single-holder + advisory lock ensures only one of them drains at a time, so the same event + is never uploaded by two concurrent drainers. Processes that do not hold the + lock keep writing events durably; the holder drains them. + """ + + def __init__( + self, + store: OfflineEventStore, + instrumentation_key: str, + endpoint: str = OneCollectorTransportOptions.DEFAULT_ENDPOINT, + compression: CompressionType = CompressionType.DEFLATE, + drain_interval_seconds: float = 2.0, + max_items_per_drain: int = 256, + send_timeout_seconds: float = 10.0, + idle_backoff_seconds: float = 30.0, + ): + self._store = store + self._drain_interval = drain_interval_seconds + self._max_items = max_items_per_drain + self._send_timeout = send_timeout_seconds + self._idle_backoff = idle_backoff_seconds + self._drain_lock = ProcessDrainLock(store.db_path + ".lock") + + self._transport = HttpJsonPostTransport( + endpoint=endpoint, + ikey=instrumentation_key, + compression=compression, + ) + + self._wake = threading.Event() + self._stop = threading.Event() + self._thread: Optional[threading.Thread] = None + + # ----- control ------------------------------------------------------- + + def start(self) -> None: + if self._thread is not None: + return + self._thread = threading.Thread(target=self._run, name="genai-telemetry-uploader", daemon=True) + self._thread.start() + + def request_drain(self) -> None: + """Nudge the uploader to drain promptly (e.g. after logging an event).""" + self._wake.set() + + def stop_loop(self, join_timeout_seconds: float = 5.0) -> bool: + """Stop the background loop and wait briefly for it to exit. + + Returns True if the thread actually stopped (so a caller may safely drain + as the sole drainer), False if it is still alive (e.g. stuck in an + in-flight send) — in which case the caller must NOT drain, to avoid + double-sending the rows the thread is still processing. + """ + self._stop.set() + self._wake.set() + thread = self._thread + if thread is None: + return True + thread.join(join_timeout_seconds) + stopped = not thread.is_alive() + if stopped: + self._thread = None + return stopped + + def signal_stop(self) -> None: + """Ask the loop to stop without blocking the caller. + + The daemon thread winds down on its next wake; the drain lock is released + when it exits (or by the OS at process exit). Use this for the opt-out + path so disabling telemetry never blocks the host. + """ + self._stop.set() + self._wake.set() + + def close(self) -> None: + """Release the single-drainer lock (urllib holds no persistent connection).""" + self._drain_lock.release() + + def stop(self, timeout_seconds: float = 12.0) -> None: + """Stop the loop and release the drain lock (convenience).""" + self.stop_loop(timeout_seconds) + self.close() + + # ----- draining ------------------------------------------------------ + + def drain_once(self) -> tuple[int, int]: + """Attempt to upload one batch. Returns (delivered_count, left_count). + + ``left_count`` is non-zero only when a transient failure leaves rows on + disk for a later retry; permanently-rejected rows are dropped (counted as + delivered for loop-termination purposes since they leave the queue). + """ + batch = self._store.get_batch(self._max_items) + if not batch: + return (0, 0) + + builder = PayloadBuilder( + max_size_bytes=OneCollectorTransportOptions.DEFAULT_MAX_PAYLOAD_SIZE_BYTES, + max_items=OneCollectorTransportOptions.DEFAULT_MAX_ITEMS_PER_PAYLOAD, + ) + included: list[int] = [] + for row_id, payload in batch: + if not builder.can_add(payload) and not builder.is_empty: + break + builder.add(payload) + included.append(row_id) + payload_bytes = builder.build() + + try: + success, status = self._transport.send(payload_bytes, self._send_timeout, item_count=len(included)) + except Exception: + success, status = (False, None) + + if success: + self._store.delete(included) + return (len(included), 0) + if not HttpJsonPostTransport.is_retryable(status): + # Permanent rejection (e.g. 4xx): drop so it can't block the queue. + self._store.delete(included) + return (len(included), 0) + # Transient failure: leave the rows for the next attempt. + return (0, len(included)) + + def flush(self, max_seconds: float = 5.0) -> None: + """Best-effort drain of all pending events, bounded by max_seconds. + + Only drains if this process holds the single-drainer lock; otherwise the + events stay durably on disk for the lock holder (or the next run). + """ + if not self._drain_lock.acquire(): + return + deadline = time.time() + max_seconds + while time.time() < deadline: + delivered, left = self.drain_once() + if delivered == 0 and left == 0: + return # queue empty + if left: + return # transient failure; leave the rest for next run + + def _run(self) -> None: + try: + while not self._stop.is_set(): + transient_failure = 0 + # Only one process drains at a time. If another holds the lock, skip + # draining this cycle; our events remain durable for the holder. + if self._drain_lock.acquire(): + try: + delivered, left = self.drain_once() + while delivered > 0 and not self._stop.is_set(): + delivered, left = self.drain_once() + transient_failure = left + except Exception: + transient_failure = 1 + + wait = self._idle_backoff if transient_failure else self._drain_interval + self._wake.wait(wait) + self._wake.clear() + finally: + # Release the single-drainer lock when the loop exits so another + # process can take over (also released by close()/OS on exit). + self._drain_lock.release() diff --git a/olive/telemetry/utils.py b/olive/telemetry/utils.py index 52a39acded..ee4283358f 100644 --- a/olive/telemetry/utils.py +++ b/olive/telemetry/utils.py @@ -2,15 +2,11 @@ # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. # -------------------------------------------------------------------------- -import base64 import functools import os import platform import tempfile -import traceback from pathlib import Path -from types import TracebackType -from typing import Optional ORT_SUPPORT_DIR = r"Microsoft/DeveloperTools/.onnxruntime" @@ -48,98 +44,3 @@ def get_telemetry_base_dir() -> Path: cache_dir = str(_resolve_home_dir() / ".cache") return Path(cache_dir).expanduser() / ORT_SUPPORT_DIR - - -def _format_exception_message(ex: BaseException, tb: Optional[TracebackType] = None) -> str: - """Format an exception and trim local paths for readability.""" - folder = "Olive" - file_line = 'File "' - formatted = traceback.format_exception(type(ex), ex, tb, limit=5) - lines = [] - for line in formatted: - line_trunc = line.strip() - if line_trunc.startswith(file_line) and folder in line_trunc: - idx = line_trunc.find(folder) - if idx != -1: - line_trunc = line_trunc[idx + len(folder) :] - elif line_trunc.startswith(file_line): - idx = line_trunc[len(file_line) :].find('"') - line_trunc = line_trunc[idx + len(file_line) :] - lines.append(line_trunc) - return "\n".join(lines) - - -class _ExclusiveFileLock: - """Cross-platform exclusive file lock context manager. - - Uses fcntl on Unix/Linux/macOS, msvcrt on Windows. - Prevents cache corruption when multiple processes access the same file. - - Design decisions: - - Lock is held for the entire duration of file access (prevents partial reads/writes) - - Lock is released automatically on close (even on exceptions) - - Platform-specific implementation (fcntl for POSIX, msvcrt for Windows) - - Assumptions: - - File locking is supported on the platform - - Lock is advisory on some systems (cooperative locking) - """ - - def __init__(self, file_path: Path, mode: str): - self.file_path = file_path - self.mode = mode - self.file = None - - def __enter__(self): - self.file = open(self.file_path, self.mode, encoding="utf-8") - - try: - # Platform-specific locking - if os.name == "posix": - import fcntl - - fcntl.flock(self.file.fileno(), fcntl.LOCK_EX) - elif os.name == "nt": - import msvcrt - - # Lock 1 byte at position 0 - msvcrt.locking(self.file.fileno(), msvcrt.LK_LOCK, 1) - except Exception: - self.file.close() - self.file = None - raise - - return self.file - - def __exit__(self, exc_type, exc_val, exc_tb): - if self.file: - # Unlock happens automatically on close - self.file.close() - - -def _exclusive_file_lock(file_path: Path, mode: str): - """Create an exclusive file lock context manager. - - :param file_path: Path to the file to lock. - :param mode: File open mode ('r', 'a', 'w', etc.). - :return: Context manager that returns an open file handle. - """ - return _ExclusiveFileLock(file_path, mode) - - -def _encode_cache_line(plaintext: str) -> str: - """Encode a single cache line using base64. - - :param plaintext: The plaintext string to encode. - :return: Base64-encoded string (safe for a single text line). - """ - return base64.b64encode(plaintext.encode("utf-8")).decode("ascii") - - -def _decode_cache_line(encoded: str) -> str: - """Decode a single base64-encoded cache line. - - :param encoded: The base64-encoded string. - :return: The decoded plaintext string. - """ - return base64.b64decode(encoded.encode("ascii")).decode("utf-8") diff --git a/olive/workflows/run/run.py b/olive/workflows/run/run.py index 89100e1c1c..b997fcdc8b 100644 --- a/olive/workflows/run/run.py +++ b/olive/workflows/run/run.py @@ -5,7 +5,7 @@ import logging from copy import deepcopy from pathlib import Path -from typing import TYPE_CHECKING, Optional, Union +from typing import TYPE_CHECKING, Any, Optional, Union from olive.common.utils import set_tempdir from olive.hardware.constants import ExecutionProvider @@ -13,6 +13,8 @@ from olive.package_config import OlivePackageConfig from olive.systems.accelerator_creator import create_accelerator from olive.systems.common import SystemType +from olive.telemetry.recipe_telemetry import _build_recipe_result_metadata, _load_config_input_for_telemetry +from olive.telemetry.telemetry_extensions import _format_exception_message, log_error, log_recipe_result from olive.workflows.run.config import RunConfig if TYPE_CHECKING: @@ -152,30 +154,71 @@ def run( list_required_packages: bool = False, package_config: Optional[Union[str, Path, dict]] = None, tempdir: Optional[Union[str, Path]] = None, + recipe_telemetry_metadata: Optional[dict[str, Any]] = None, + emit_recipe_telemetry: bool = True, ): # set tempdir set_tempdir(tempdir) + package_config_input = package_config + try: + package_config_telemetry_input = ( + _load_config_input_for_telemetry(package_config_input) if package_config_input is not None else None + ) + except Exception: + package_config_telemetry_input = None + + package_config_provided = package_config is not None if package_config is None: package_config = OlivePackageConfig.get_default_config_path() - package_config = OlivePackageConfig.parse_file_or_obj(package_config) - run_config: RunConfig = RunConfig.parse_file_or_obj(run_config) - - if list_required_packages: - # set the log level to INFO for packages - set_verbosity_info() - required_packages = get_required_packages(package_config, run_config) - generate_files_from_packages(required_packages, "olive_requirements.txt") - return None - - if run_config.engine.host and run_config.engine.host.type == SystemType.Docker: - docker_system = run_config.engine.host.create_system() - return docker_system.run_workflow(run_config) - - # set log level for olive - set_default_logger_severity(run_config.engine.log_severity_level) - return run_engine(package_config, run_config) + parsed_run_config = None + success = False + exception = None + try: + package_config = OlivePackageConfig.parse_file_or_obj(package_config) + parsed_run_config = RunConfig.parse_file_or_obj(run_config) + + if list_required_packages: + # set the log level to INFO for packages + set_verbosity_info() + required_packages = get_required_packages(package_config, parsed_run_config) + generate_files_from_packages(required_packages, "olive_requirements.txt") + success = True + return None + + if parsed_run_config.engine.host and parsed_run_config.engine.host.type == SystemType.Docker: + docker_system = parsed_run_config.engine.host.create_system() + workflow_output = docker_system.run_workflow(deepcopy(parsed_run_config)) + success = True + return workflow_output + + # set log level for olive + set_default_logger_severity(parsed_run_config.engine.log_severity_level) + workflow_output = run_engine(package_config, parsed_run_config) + success = True + return workflow_output + except Exception as exc: + exception = exc + raise + finally: + if exception is not None: + log_error( + exception_type=type(exception).__name__, + exception_message=_format_exception_message(exception, exception.__traceback__), + ) + if emit_recipe_telemetry: + metadata = _build_recipe_result_metadata( + run_config, + None, + parsed_run_config, + recipe_telemetry_metadata, + list_required_packages=list_required_packages, + package_config_input=package_config_telemetry_input, + package_config_provided=package_config_provided, + ) + recipe_name = metadata.pop("recipe_name") + log_recipe_result(recipe_name, success=success, metadata=metadata) def generate_files_from_packages(packages, file_name): diff --git a/requirements.txt b/requirements.txt index 1032c72f97..cfb5a1b9de 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,7 +3,6 @@ numpy onnx onnx_ir>=0.1.2 onnxscript>=0.5.3 -opentelemetry-sdk>=1.39.1 optuna pandas pydantic>=2.0 diff --git a/test/cli/test_cli.py b/test/cli/test_cli.py index 59dee0557c..e0b75cac19 100644 --- a/test/cli/test_cli.py +++ b/test/cli/test_cli.py @@ -108,7 +108,17 @@ def test_workflow_run_command(mock_run, tempdir, list_required_packages, tmp_pat # assert mock_run.assert_called_once_with( - {"key": "value"}, package_config=None, tempdir=tempdir, list_required_packages=list_required_packages + {"key": "value"}, + package_config=None, + tempdir=tempdir, + list_required_packages=list_required_packages, + recipe_telemetry_metadata={ + "recipe_command": "WorkflowRun", + "recipe_source": "config_file", + "recipe_format": "json", + "execution_mode": "list_required_packages" if list_required_packages else "run", + "package_config_provided": False, + }, ) @@ -150,6 +160,22 @@ def test_workflow_run_command_with_overrides(mock_repo_exists, mock_run, tmp_pat list_required_packages=False, package_config=None, tempdir=None, + recipe_telemetry_metadata={ + "recipe_command": "WorkflowRun", + "recipe_source": "config_file", + "recipe_format": "json", + "execution_mode": "run", + "package_config_provided": False, + "config_overrides": { + "input_model": { + "type": "HfModel", + "model_path": "hf-internal-testing/tiny-random-LlamaForCausalLM", + "load_kwargs": {"attn_implementation": "eager", "trust_remote_code": False}, + }, + "output_dir": str(Path("new_output_path").resolve()), + "log_severity_level": 2, + }, + }, ) @@ -193,6 +219,13 @@ def test_workflow_run_command_with_test_override(mock_run, tmp_path): list_required_packages=False, package_config=None, tempdir=None, + recipe_telemetry_metadata={ + "recipe_command": "WorkflowRun", + "recipe_source": "config_file", + "recipe_format": "json", + "execution_mode": "run", + "package_config_provided": False, + }, ) diff --git a/test/conftest.py b/test/conftest.py index db97c685af..71aa6d59c1 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -43,5 +43,18 @@ def maybe_patch_inc(): @pytest.fixture(scope="session", autouse=True) -def disable_telemetry(): - Telemetry().disable_telemetry() +def disable_telemetry(tmp_path_factory): + # Keep telemetry fully inert during tests. The device-id heartbeat is now + # durable, so simply constructing Telemetry() would enqueue one to the real + # store and the uploader would try to send it. Redirect the store to a + # throwaway directory and stub the HTTP transport so no test run writes to + # the real telemetry store or reaches the network. + import olive.telemetry.library.transport as transport_module + import olive.telemetry.telemetry as telemetry_module + + telemetry_dir = tmp_path_factory.mktemp("telemetry") + with patch.object(telemetry_module, "get_telemetry_base_dir", lambda: str(telemetry_dir)), patch.object( + transport_module.HttpJsonPostTransport, "send", lambda *args, **kwargs: (True, 204) + ): + Telemetry().disable_telemetry() + yield diff --git a/test/systems/docker/test_docker_system.py b/test/systems/docker/test_docker_system.py index 5430b68587..ef20a43d18 100644 --- a/test/systems/docker/test_docker_system.py +++ b/test/systems/docker/test_docker_system.py @@ -2,6 +2,7 @@ # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. # -------------------------------------------------------------------------- +import json from unittest.mock import MagicMock, patch import pytest @@ -140,6 +141,37 @@ def test_run_workflow(self, mock_find_resources, mock_tempdir, mock_from_env, tm # Verify cleanup mock_container.remove.assert_called_once() + @patch("olive.systems.docker.docker_system.docker.from_env") + def test_prepare_environment_forwards_ci_to_workflow_container(self, mock_from_env, monkeypatch): + mock_docker_client = MagicMock() + mock_from_env.return_value = mock_docker_client + mock_docker_client.images.get.return_value = MagicMock() + monkeypatch.setenv("TF_BUILD", "True") + docker_config = self.get_default_docker_config() + docker_system = DockerSystem( + image_name=docker_config.image_name, + build_context_path=docker_config.build_context_path, + dockerfile=docker_config.dockerfile, + work_dir=docker_config.work_dir, + ) + + environment = docker_system._prepare_environment({}) + + assert environment["CI"] == "1" + + def test_workflow_runner_disables_inner_recipe_telemetry(self, tmp_path, monkeypatch): + from olive.systems.docker import workflow_runner + + monkeypatch.delenv("HF_TOKEN", raising=False) + config = {"input_model": {"type": "ONNXModel", "model_path": "model.onnx"}} + config_path = tmp_path / "config.json" + config_path.write_text(json.dumps(config)) + + with patch.object(workflow_runner, "olive_run") as mock_olive_run: + workflow_runner.runner_entry(config_path) + + mock_olive_run.assert_called_once_with(config, emit_recipe_telemetry=False) + @patch("olive.systems.docker.docker_system.docker.from_env") @patch("olive.systems.docker.docker_system.tempfile.TemporaryDirectory") @patch("olive.systems.docker.docker_system.find_all_resources") diff --git a/test/test_telemetry.py b/test/test_telemetry.py new file mode 100644 index 0000000000..5aec3664c1 --- /dev/null +++ b/test/test_telemetry.py @@ -0,0 +1,473 @@ +# ------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. +# -------------------------------------------------------------------------- +# pylint: disable=protected-access +"""Tests for the SQLite-backed telemetry pipeline. + +Covers the three-state opt-out semantics (CI / user opt-out / enabled), the +ALLOWED_KEYS whitelist filtering, the durable SQLite store, the single-drainer +process lock, the background uploader's success/poison/transient handling, and +the Common Schema serialization helpers. No test touches the network or the real +user profile: the HTTP transport is stubbed and the store directory is +redirected to a temp dir. +""" + +import json +import tempfile +from types import SimpleNamespace + +import pytest + +import olive.telemetry.library.transport as transport_mod +import olive.telemetry.telemetry as tmod +from olive.telemetry.library.connection_string_parser import ConnectionStringParser +from olive.telemetry.library.serialization import CommonSchemaJsonSerializationHelper as Serializer +from olive.telemetry.offline_store import SCHEMA_VERSION, OfflineEventStore +from olive.telemetry.process_lock import ProcessDrainLock +from olive.telemetry.telemetry import ( + ACTION_EVENT_NAME, + ERROR_EVENT_NAME, + HEARTBEAT_EVENT_NAME, + RECIPE_EVENT_NAME, + Telemetry, + is_ci_environment, +) +from olive.telemetry.uploader import EventUploader + +_OPT_OUT_VAR = "OLIVE_DISABLE_TELEMETRY" +_CI_VARS = ( + "CI", + "TF_BUILD", + "GITHUB_ACTIONS", + "JENKINS_URL", + "CODEBUILD_BUILD_ID", + "BUILDKITE", + "SYSTEM_TEAMFOUNDATIONCOLLECTIONURI", +) + + +@pytest.fixture +def tenv(tmp_path, monkeypatch): + """Hermetic telemetry environment. + + Clears CI/opt-out signals so each test sets its own mode, stubs the HTTP + transport (recording every send in ``.sends``), and redirects the durable + store off the real profile. On teardown the heartbeat thread is joined + BEFORE monkeypatch restores the real transport, so a lagging heartbeat can + never POST real device data from a test. + """ + Telemetry._instance = None + for var in (_OPT_OUT_VAR, *_CI_VARS): + monkeypatch.delenv(var, raising=False) + + sends = [] + + def _record_send(self, payload, timeout_sec, item_count=1): + sends.append({"item_count": item_count, "size": len(payload), "payload": payload}) + return True, 204 + + monkeypatch.setattr(transport_mod.HttpJsonPostTransport, "send", _record_send) + monkeypatch.setattr(tmod, "get_telemetry_base_dir", lambda: str(tmp_path)) + + yield SimpleNamespace(sends=sends, tmp_path=tmp_path) + + inst = Telemetry._instance + if inst is not None: + uploader = getattr(inst, "_uploader", None) + if uploader is not None: + uploader.stop_loop(5) + heartbeat = getattr(inst, "_heartbeat_thread", None) + if heartbeat is not None: + heartbeat.join() + Telemetry._instance = None + + +def _quiesce(t): + """Join the heartbeat (so it is enqueued) and drain the uploader so the + recorded sends and store counts are deterministic.""" + heartbeat = getattr(t, "_heartbeat_thread", None) + if heartbeat is not None: + heartbeat.join() + if t._uploader is not None: + t._uploader.stop_loop(5) + for _ in range(20): + if t._store is None or t._store.count() == 0: + break + t._uploader.drain_once() + + +def _sent_event_names(sends): + names = [] + for s in sends: + payload = bytes(s["payload"]) + for token in (b"OliveHeartbeat", b"OliveRecipe", b"OliveAction", b"OliveError"): + if token in payload: + names.append(token.decode()) + return names + + +# -------------------------------------------------------------------------- +# Three-state opt-out semantics +# -------------------------------------------------------------------------- + + +def test_ci_is_recipe_only_with_no_heartbeat(tenv, monkeypatch): + monkeypatch.setenv("CI", "1") + t = Telemetry() + + # CI suppresses the device-id heartbeat but still persists recipe events. + assert t._heartbeat_thread is None + assert t._store is not None + + before = t._store.count() + t.log(RECIPE_EVENT_NAME, {"recipe_name": "r", "success": True}) + assert t._store.count() == before + 1 + + middle = t._store.count() + t.log(ACTION_EVENT_NAME, {"invoked_from": "cli", "action_name": "x", "duration_ms": 1.0, "success": True}) + assert t._store.count() == middle # non-recipe events suppressed in CI + + _quiesce(t) + names = _sent_event_names(tenv.sends) + assert "OliveHeartbeat" not in names + assert "OliveRecipe" in names + + +def test_user_opt_out_records_heartbeat_only(tenv, monkeypatch): + monkeypatch.setenv(_OPT_OUT_VAR, "1") + t = Telemetry() + + # Detailed events are not recorded, but the heartbeat is durably queued. + assert t._enabled is False + assert t._store is not None + assert t._heartbeat_thread is not None + + # Detailed-event methods are no-ops and must not raise. + t.log(ACTION_EVENT_NAME, {"invoked_from": "cli", "action_name": "x", "duration_ms": 1.0, "success": True}) + + _quiesce(t) + names = _sent_event_names(tenv.sends) + assert "OliveHeartbeat" in names + assert "OliveAction" not in names + + +def test_opt_out_and_ci_send_nothing(tenv, monkeypatch): + monkeypatch.setenv(_OPT_OUT_VAR, "1") + monkeypatch.setenv("CI", "1") + t = Telemetry() + _quiesce(t) + + # Explicit opt-out + CI: record and send nothing at all. + assert t._enabled is False + assert t._store is None + assert t._heartbeat_thread is None + assert tenv.sends == [] + + +def test_enabled_records_heartbeat_and_events(tenv): + t = Telemetry() + + assert t._enabled is True + assert t._store is not None + + t.log(ACTION_EVENT_NAME, {"invoked_from": "cli", "action_name": "x", "duration_ms": 1.0, "success": True}) + + _quiesce(t) + names = _sent_event_names(tenv.sends) + assert "OliveHeartbeat" in names + assert "OliveAction" in names + + +def test_disable_telemetry_stops_detailed_events(tenv): + t = Telemetry() + _quiesce(t) + t.disable_telemetry() + + assert t._enabled is False + before = t._store.count() if t._store is not None else 0 + t.log(ACTION_EVENT_NAME, {"invoked_from": "cli", "action_name": "x", "duration_ms": 1.0, "success": True}) + after = t._store.count() if t._store is not None else 0 + assert after == before + + +# -------------------------------------------------------------------------- +# Whitelist filtering / payload building +# -------------------------------------------------------------------------- + + +def test_build_payload_drops_non_whitelisted_keys(tenv): + t = Telemetry() + _quiesce(t) + + payload = t._build_payload( + ACTION_EVENT_NAME, + { + "invoked_from": "cli", + "action_name": "WorkflowRun", + "duration_ms": 1.0, + "success": True, + "secret": "SHOULD_NOT_BE_SENT", + }, + ) + data = json.loads(payload)["data"] + assert "secret" not in data + assert data["action_name"] == "WorkflowRun" + # Defaults are stamped on every event. + assert data["app_version"] + assert data["app_instance_id"] + + +def test_build_payload_returns_none_for_unknown_event(tenv): + t = Telemetry() + _quiesce(t) + assert t._build_payload("TotallyUnknownEvent", {"k": "v"}) is None + + +def test_build_payload_heartbeat_uses_flat_os_fields(tenv): + t = Telemetry() + _quiesce(t) + + payload = t._build_payload( + HEARTBEAT_EVENT_NAME, + { + "device_id": "DEVICE", + "device_id_status": "ok", + "os": "Windows", + "os_version": "10.0.22631", + "os_release": "11", + "os_arch": "AMD64", + "leak": "DROP", + }, + ) + data = json.loads(payload)["data"] + assert data["device_id"] == "DEVICE" + assert data["device_id_status"] == "ok" + assert data["os"] == "Windows" + assert data["os_version"] == "10.0.22631" + assert "leak" not in data + + +def test_global_metadata_is_merged_then_filtered(tenv): + t = Telemetry() + _quiesce(t) + + # app_version is whitelisted for actions; not_allowed is not. + t.add_global_metadata({"app_version": "9.9.9", "not_allowed": "DROP"}) + payload = t._build_payload( + ACTION_EVENT_NAME, + {"invoked_from": "cli", "action_name": "x", "duration_ms": 1.0, "success": True}, + ) + data = json.loads(payload)["data"] + assert data["app_version"] == "9.9.9" + assert "not_allowed" not in data + + +def test_error_event_whitelist(tenv): + t = Telemetry() + _quiesce(t) + payload = t._build_payload( + ERROR_EVENT_NAME, + {"exception_type": "RuntimeError", "exception_message": "boom", "stack": "SENSITIVE"}, + ) + data = json.loads(payload)["data"] + assert data["exception_type"] == "RuntimeError" + assert data["exception_message"] == "boom" + assert "stack" not in data + + +# -------------------------------------------------------------------------- +# CI detection +# -------------------------------------------------------------------------- + + +def test_is_ci_environment(monkeypatch): + for var in (_OPT_OUT_VAR, *_CI_VARS): + monkeypatch.delenv(var, raising=False) + assert is_ci_environment() is False + monkeypatch.setenv("GITHUB_ACTIONS", "true") + assert is_ci_environment() is True + + +# -------------------------------------------------------------------------- +# Durable SQLite store +# -------------------------------------------------------------------------- + + +def _new_store(**kwargs): + import os + + db = os.path.join(tempfile.mkdtemp(), "olive_telemetry.db") + return OfflineEventStore(db, **kwargs) + + +def test_store_is_fifo(): + store = _new_store() + for i in range(5): + store.store(f'{{"e":{i}}}'.encode()) + assert store.count() == 5 + batch = store.get_batch(3) + assert [payload for _, payload in batch] == [b'{"e":0}', b'{"e":1}', b'{"e":2}'] + + +def test_store_delete(): + store = _new_store() + store.store(b'{"a":1}') + store.store(b'{"b":2}') + ids = [row_id for row_id, _ in store.get_batch(10)] + store.delete(ids[:1]) + assert store.count() == 1 + + +def test_store_trims_over_capacity(): + store = _new_store(max_records=8) + for i in range(40): + store.store(f'{{"i":{i}}}'.encode()) + assert store.count() <= 8 + + +def test_store_rejects_empty_payload(): + store = _new_store() + assert store.store(b"") is False + + +def test_store_stamps_schema_version(): + import sqlite3 + + store = _new_store() + version = sqlite3.connect(store.db_path).execute("PRAGMA user_version").fetchone()[0] + assert version == SCHEMA_VERSION + + +# -------------------------------------------------------------------------- +# Single-drainer process lock +# -------------------------------------------------------------------------- + + +def _lock_path(): + import os + + return os.path.join(tempfile.mkdtemp(), "olive_telemetry.db.lock") + + +def test_lock_is_mutually_exclusive(): + path = _lock_path() + a = ProcessDrainLock(path) + b = ProcessDrainLock(path) + assert a.acquire() is True + assert b.acquire() is False # held by a + a.release() + assert b.acquire() is True # released + b.release() + + +def test_lock_reacquire_is_idempotent(): + a = ProcessDrainLock(_lock_path()) + assert a.acquire() is True + assert a.acquire() is True # already held + assert a.held is True + a.release() + assert a.held is False + + +# -------------------------------------------------------------------------- +# Uploader drain classification (no real network) +# -------------------------------------------------------------------------- + + +def _store_and_uploader(): + import os + + db = os.path.join(tempfile.mkdtemp(), "olive_telemetry.db") + store = OfflineEventStore(db) + uploader = EventUploader(store, instrumentation_key="abc-def") + return store, uploader + + +def test_uploader_deletes_on_success(): + store, uploader = _store_and_uploader() + store.store(b'{"ok":1}') + uploader._transport.send = lambda *a, **k: (True, 204) + delivered, left = uploader.drain_once() + assert (delivered, left) == (1, 0) + assert store.count() == 0 + + +def test_uploader_drops_poison_4xx(): + store, uploader = _store_and_uploader() + store.store(b'{"bad":1}') + uploader._transport.send = lambda *a, **k: (False, 400) + uploader.drain_once() + assert store.count() == 0 # dropped, not retried forever + + +def test_uploader_retains_transient_5xx(): + store, uploader = _store_and_uploader() + store.store(b'{"later":1}') + uploader._transport.send = lambda *a, **k: (False, 503) + delivered, left = uploader.drain_once() + assert (delivered, left) == (0, 1) + assert store.count() == 1 # kept for retry + + +# -------------------------------------------------------------------------- +# Serialization + connection string parsing +# -------------------------------------------------------------------------- + + +def test_serialize_basic_types(): + assert Serializer.serialize_value(None) is None + assert Serializer.serialize_value(True) is True + assert Serializer.serialize_value(42) == 42 + assert Serializer.serialize_value("hello") == "hello" + assert Serializer.serialize_value([1, "two", 3.0]) == [1, "two", 3.0] + assert Serializer.serialize_value({"k": "v"}) == {"k": "v"} + + +def test_create_event_envelope(): + from datetime import datetime, timezone + + envelope = Serializer.create_event_envelope( + event_name="TestEvent", + timestamp=datetime(2024, 1, 1, 12, 0, 0, tzinfo=timezone.utc), + ikey="o:test-key", + data={"key": "value"}, + ) + assert envelope["name"] == "TestEvent" + assert envelope["iKey"] == "o:test-key" + assert envelope["data"] == {"key": "value"} + + +def test_connection_string_parser(): + assert ConnectionStringParser("InstrumentationKey=abc-def-ghi").instrumentation_key == "abc-def-ghi" + with pytest.raises(ValueError): + ConnectionStringParser("") + with pytest.raises(ValueError): + ConnectionStringParser("SomeOtherKey=value") + + +# -------------------------------------------------------------------------- +# Exception-message path redaction (privacy) +# -------------------------------------------------------------------------- + + +def test_redact_paths_keeps_filenames_drops_usernames(): + from olive.telemetry.telemetry_extensions import _redact_paths + + assert _redact_paths(r"C:\Users\alice\model.onnx") == "model.onnx" + assert _redact_paths("/var/data/run/output.log") == "output.log" + # Last segment is a directory/username (no extension) -> fully redacted. + assert _redact_paths("/home/bob") == "" + # UNC paths are redacted too. + assert _redact_paths(r"\\server\share\secret") == "" + + +def test_format_exception_message_redacts_paths_in_message(): + from olive.telemetry.telemetry_extensions import _format_exception_message + + try: + raise RuntimeError(r"failed to read C:\Users\alice\secret\weights.bin") + except RuntimeError as exc: + message = _format_exception_message(exc, exc.__traceback__) + assert "alice" not in message + assert "weights.bin" in message diff --git a/test/workflows/test_workflow_run.py b/test/workflows/test_workflow_run.py index 82cc4980bf..6af0118374 100644 --- a/test/workflows/test_workflow_run.py +++ b/test/workflows/test_workflow_run.py @@ -1,10 +1,16 @@ +import json import sys from copy import deepcopy from pathlib import Path -from unittest.mock import patch +from unittest.mock import Mock, patch import pytest +from olive.telemetry.recipe_telemetry import ( + _build_recipe_hash, + _classify_input_model_source, + _classify_run_config_source, +) from olive.workflows import run as olive_run from test.utils import ( get_pytorch_model, @@ -125,3 +131,313 @@ def test_run_packages(): # cleanup requirements_file_path.unlink() + + +@patch("olive.workflows.run.run.log_recipe_result") +@patch("olive.workflows.run.run.run_engine") +@patch("olive.telemetry.recipe_telemetry.is_ci_environment", return_value=False) +def test_run_logs_recipe_result_success(_, mock_run_engine, mock_log_recipe_result): + config = { + "input_model": { + "type": "HfModel", + "model_path": "Qwen/Qwen2.5-0.5B-Instruct", + "task": "text-generation", + "load_kwargs": {"attn_implementation": "eager"}, + }, + "systems": { + "local_system": { + "type": "LocalSystem", + "accelerators": [{"device": "gpu", "execution_providers": ["CUDAExecutionProvider"]}], + } + }, + "engine": {"target": "local_system"}, + "passes": {"dynamic_quant": {"type": "OnnxDynamicQuantization"}}, + } + expected_output = object() + mock_run_engine.return_value = expected_output + + output = olive_run( + config, + recipe_telemetry_metadata={ + "recipe_name": "Quantize", + "recipe_command": "Quantize", + "recipe_source": "generated_cli", + "recipe_format": "generated", + }, + ) + + assert output is expected_output + mock_log_recipe_result.assert_called_once() + assert mock_log_recipe_result.call_args.args[0] == "Quantize" + assert mock_log_recipe_result.call_args.kwargs["success"] is True + + metadata = mock_log_recipe_result.call_args.kwargs["metadata"] + assert metadata["recipe_command"] == "Quantize" + assert metadata["recipe_source"] == "generated_cli" + assert metadata["recipe_format"] == "generated" + assert metadata["workflow_id"] == "default_workflow" + assert metadata["input_model_type"] == "hfmodel" + assert metadata["input_model_source"] == "string_name" + assert metadata["model_task"] == "text-generation" + assert metadata["target_system_type"] == "LocalSystem" + assert metadata["target_device"] == "gpu" + assert metadata["target_execution_provider"] == "CUDAExecutionProvider" + assert metadata["target_execution_providers"] == "CUDAExecutionProvider" + assert metadata["host_system_type"] == "LocalSystem" + assert "host_device" not in metadata + assert "host_execution_provider" not in metadata + assert "host_execution_providers" not in metadata + assert metadata["pass_types"] == "onnxdynamicquantization" + assert metadata["pass_count"] == 1 + assert metadata["data_config_count"] == 0 + assert metadata["search_enabled"] is False + assert metadata["package_config_provided"] is False + assert metadata["is_ci"] is False + assert metadata["recipe_hash"] + assert "input_model_name_hash" not in metadata + assert "config_overrides" not in metadata + + +@patch("olive.workflows.run.run.log_recipe_result") +@patch("olive.workflows.run.run.run_engine") +def test_run_logs_config_overrides_when_recipe_metadata_provides_overrides(mock_run_engine, mock_log_recipe_result): + config = { + "input_model": { + "type": "HfModel", + "model_path": "Qwen/Qwen2.5-0.5B-Instruct", + "task": "text-generation", + } + } + mock_run_engine.return_value = object() + + olive_run( + config, + recipe_telemetry_metadata={ + "recipe_name": "WorkflowRun", + "config_overrides": { + "input_model": { + "type": "HfModel", + "model_path": "Qwen/Qwen2.5-0.5B-Instruct", + }, + "engine": {"target": "local_system"}, + "data_path": Path("data"), + }, + }, + ) + + metadata = mock_log_recipe_result.call_args.kwargs["metadata"] + config_overrides = json.loads(metadata["config_overrides"]) + assert config_overrides["input_model"]["model_path"] == "Qwen/Qwen2.5-0.5B-Instruct" + assert config_overrides["engine"]["target"] == "" + assert config_overrides["data_path"] == "" + + +@patch("olive.workflows.run.run.log_error") +@patch("olive.workflows.run.run.log_recipe_result") +@patch("olive.workflows.run.run.run_engine") +def test_run_logs_recipe_result_failure(mock_run_engine, mock_log_recipe_result, mock_log_error): + config = { + "input_model": { + "type": "HfModel", + "model_path": "Qwen/Qwen2.5-0.5B-Instruct", + "task": "text-generation", + "load_kwargs": {"attn_implementation": "eager"}, + }, + "passes": {"dynamic_quant": {"type": "OnnxDynamicQuantization"}}, + } + mock_run_engine.side_effect = ValueError("recipe failed") + + with pytest.raises(ValueError, match="recipe failed"): + olive_run( + config, + recipe_telemetry_metadata={ + "recipe_name": "Quantize", + "recipe_command": "Quantize", + "recipe_source": "generated_cli", + "recipe_format": "generated", + }, + ) + + mock_log_recipe_result.assert_called_once() + assert mock_log_recipe_result.call_args.args[0] == "Quantize" + assert mock_log_recipe_result.call_args.kwargs["success"] is False + assert "exception_type" not in mock_log_recipe_result.call_args.kwargs + mock_log_error.assert_called_once() + assert mock_log_error.call_args.kwargs["exception_type"] == "ValueError" + assert "recipe failed" in mock_log_error.call_args.kwargs["exception_message"] + + +@patch("olive.workflows.run.run.log_recipe_result") +@patch("olive.workflows.run.run.run_engine") +def test_run_skips_recipe_result_when_recipe_telemetry_is_not_emitted(mock_run_engine, mock_log_recipe_result): + expected_output = object() + mock_run_engine.return_value = expected_output + + output = olive_run( + { + "input_model": { + "type": "HfModel", + "model_path": "Qwen/Qwen2.5-0.5B-Instruct", + "task": "text-generation", + } + }, + emit_recipe_telemetry=False, + ) + + assert output is expected_output + mock_log_recipe_result.assert_not_called() + + +@patch("olive.workflows.run.run.log_recipe_result") +@patch("olive.systems.system_config.SystemConfig.create_system") +def test_run_logs_single_parent_recipe_result_for_docker_host(mock_create_system, mock_log_recipe_result): + expected_output = object() + docker_system = Mock() + + def run_workflow(container_run_config): + container_run_config.engine.host = container_run_config.engine.target + return expected_output + + docker_system.run_workflow.side_effect = run_workflow + mock_create_system.return_value = docker_system + config = { + "input_model": {"type": "ONNXModel", "model_path": "model.onnx"}, + "systems": { + "docker_system": { + "type": "Docker", + "config": { + "dockerfile": "Dockerfile", + "build_context_path": "build_context", + "image_name": "test-image:latest", + "work_dir": "/olive-ws", + }, + }, + "local_system": {"type": "LocalSystem"}, + }, + "engine": {"host": "docker_system", "target": "local_system"}, + } + + output = olive_run(config) + + assert output is expected_output + mock_log_recipe_result.assert_called_once() + metadata = mock_log_recipe_result.call_args.kwargs["metadata"] + assert metadata["host_system_type"] == "Docker" + + +@patch("olive.workflows.run.run.log_recipe_result") +@patch("olive.workflows.run.run.run_engine") +def test_run_logs_recipe_host_metadata_without_explicit_target(mock_run_engine, mock_log_recipe_result): + config = { + "input_model": { + "type": "HfModel", + "model_path": "Qwen/Qwen2.5-0.5B-Instruct", + "task": "text-generation", + "load_kwargs": {"attn_implementation": "eager"}, + }, + "systems": { + "host_system": { + "type": "LocalSystem", + "accelerators": [{"device": "cpu", "execution_providers": ["CPUExecutionProvider"]}], + } + }, + "engine": {"host": "host_system"}, + } + mock_run_engine.return_value = object() + + olive_run( + config, + recipe_telemetry_metadata={ + "recipe_name": "Quantize", + "recipe_command": "Quantize", + "recipe_source": "generated_cli", + "recipe_format": "generated", + }, + ) + + metadata = mock_log_recipe_result.call_args.kwargs["metadata"] + assert "target_system_type" not in metadata + assert "target_device" not in metadata + assert "target_execution_provider" not in metadata + assert "target_execution_providers" not in metadata + assert metadata["host_system_type"] == "LocalSystem" + assert metadata["host_device"] == "cpu" + assert metadata["host_execution_provider"] == "CPUExecutionProvider" + assert metadata["host_execution_providers"] == "CPUExecutionProvider" + + +@patch("olive.workflows.run.run.log_recipe_result") +@patch("olive.workflows.run.run.run_engine") +def test_run_logs_package_config_overrides_when_package_config_provided(mock_run_engine, mock_log_recipe_result): + config = { + "input_model": { + "type": "HfModel", + "model_path": "Qwen/Qwen2.5-0.5B-Instruct", + "task": "text-generation", + "load_kwargs": {"attn_implementation": "eager"}, + } + } + mock_run_engine.return_value = object() + + olive_run( + config, + package_config={ + "passes": { + "AddOliveMetadata": { + "module_path": "olive.passes.onnx.add_metadata.AddOliveMetadata", + "supported_providers": ["CPUExecutionProvider"], + } + }, + "extra_dependencies": {"custom_accelerator": ["custom-package"]}, + }, + recipe_telemetry_metadata={ + "recipe_name": "Quantize", + "recipe_command": "Quantize", + "recipe_source": "generated_cli", + "recipe_format": "generated", + }, + ) + + metadata = mock_log_recipe_result.call_args.kwargs["metadata"] + assert metadata["package_config_provided"] is True + package_config_overrides = json.loads(metadata["package_config_overrides"]) + assert package_config_overrides["passes"][0]["supported_providers"] == ["CPUExecutionProvider"] + assert "module_path" not in package_config_overrides["passes"][0] + assert package_config_overrides["extra_dependencies"]["custom_accelerator"] == ["custom-package"] + + +def test_classify_run_config_source_handles_non_pathlike_object(): + assert _classify_run_config_source(object()) == ("config_object", "object") + + +def test_classify_input_model_source_does_not_depend_on_local_filesystem(tmp_path, monkeypatch): + assert _classify_input_model_source("Qwen/Qwen2.5-0.5B-Instruct") == "string_name" + + monkeypatch.chdir(tmp_path) + (tmp_path / "bert-base-uncased").mkdir() + + assert _classify_input_model_source("bert-base-uncased") == "string_name" + assert _classify_input_model_source("./model.onnx") == "local_file" + assert _classify_input_model_source("model.onnx") == "local_file" + + +def test_recipe_hash_does_not_depend_on_local_model_path_presence(tmp_path, monkeypatch): + config = { + "input_model": {"type": "HfModel", "config": {"model_path": "bert-base-uncased"}}, + "engine": {"output_dir": "output"}, + } + recipe_hash = _build_recipe_hash(config) + + monkeypatch.chdir(tmp_path) + (tmp_path / "bert-base-uncased").mkdir() + + assert _build_recipe_hash(config) == recipe_hash + + +def test_recipe_hash_handles_path_values(): + config = { + "input_model": {"type": "HfModel", "config": {"model_path": Path("model")}}, + "custom_value": Path("custom"), + } + + assert _build_recipe_hash(config)