From b2e96133cdd40a95166a6d9c6dfac828009ce31d Mon Sep 17 00:00:00 2001 From: vinay79n Date: Tue, 16 Jun 2026 15:35:24 +0530 Subject: [PATCH 1/7] feat: auto-apply Snowflake ownership / governance object tags on prod tables which get published. --- README.md | 1 + .../create_ownership_registry_view.md | 46 +++++ docs/metaflow/publish.md | 41 +++++ docs/metaflow/publish_pandas.md | 24 +++ pyproject.toml | 5 +- .../_snowflake/object_tags.py | 166 ++++++++++++++++++ src/ds_platform_utils/metaflow/__init__.py | 2 + src/ds_platform_utils/metaflow/pandas.py | 24 +++ src/ds_platform_utils/metaflow/registry.py | 56 ++++++ .../metaflow/write_audit_publish.py | 17 ++ .../unit_tests/snowflake/test__object_tags.py | 159 +++++++++++++++++ uv.lock | 2 +- 12 files changed, 540 insertions(+), 3 deletions(-) create mode 100644 docs/metaflow/create_ownership_registry_view.md create mode 100644 src/ds_platform_utils/_snowflake/object_tags.py create mode 100644 src/ds_platform_utils/metaflow/registry.py create mode 100644 tests/unit_tests/snowflake/test__object_tags.py diff --git a/README.md b/README.md index 9d4dad2..7eea4c5 100644 --- a/README.md +++ b/README.md @@ -3,6 +3,7 @@ ## Metaflow API Docs - [BatchInferencePipeline](docs/metaflow/batch_inference_pipeline.md) +- [create_ownership_registry_view](docs/metaflow/create_ownership_registry_view.md) - [make_pydantic_parser_fn](docs/metaflow/make_pydantic_parser_fn.md) - [publish](docs/metaflow/publish.md) - [publish_pandas](docs/metaflow/publish_pandas.md) diff --git a/docs/metaflow/create_ownership_registry_view.md b/docs/metaflow/create_ownership_registry_view.md new file mode 100644 index 0000000..0dd3e1a --- /dev/null +++ b/docs/metaflow/create_ownership_registry_view.md @@ -0,0 +1,46 @@ +# `create_ownership_registry_view` + +Source: `ds_platform_utils.metaflow.registry.create_ownership_registry_view` + +Creates (or replaces) the central **table-ownership registry view**, +`PATTERN_DB.DATA_SCIENCE.TABLE_OWNERSHIP_REGISTRY`. The view pivots the object tags +applied by [`publish`](publish.md) / [`publish_pandas`](publish_pandas.md) into one row +per table, exposing `owner`, `team`, `domain`, `project`, `status`, `sla` and `contact`. + +This is a one-time admin helper. + +## Signature + +```python +create_ownership_registry_view(conn: SnowflakeConnection | None = None) -> str +``` + +| Parameter | Type | Required | Description | +| --------- | ----------------------------- | -------: | ------------------------------------------------------------------------ | +| `conn` | `SnowflakeConnection \| None` | No | Open Snowflake connection. If omitted, one is created via `get_snowflake_connection()`. | + +**Returns:** the executed `CREATE OR REPLACE VIEW` SQL string. + +## Usage + +```python +from ds_platform_utils.metaflow import create_ownership_registry_view + +create_ownership_registry_view() +``` + +Then query it: + +```sql +SELECT * FROM PATTERN_DB.DATA_SCIENCE.TABLE_OWNERSHIP_REGISTRY +ORDER BY team, table_name; +``` + +## Notes + +- **No refresh needed.** A view is not materialized — it re-runs its query on every read, + so it is always live. +- **~2h lag.** The view reads `SNOWFLAKE.ACCOUNT_USAGE.TAG_REFERENCES`, which itself lags + up to ~2 hours. For the current value of a single table's tag, use + `SYSTEM$GET_TAG('PATTERN_DB.DATA_SCIENCE.TABLE_OWNER', '', 'table')` instead. +- **Adoption-based.** Only tables that have at least one ownership tag appear in the view. diff --git a/docs/metaflow/publish.md b/docs/metaflow/publish.md index 13485d9..e99d146 100644 --- a/docs/metaflow/publish.md +++ b/docs/metaflow/publish.md @@ -14,6 +14,7 @@ publish( ctx: dict[str, Any] | None = None, warehouse: Literal["XS", "MED", "XL"] = None, use_utc: bool = True, + tags: dict[str, str] | None = None, ) -> None ``` @@ -22,6 +23,8 @@ publish( - Reads SQL from a string or `.sql` path. - Runs write/audit/publish operations through Snowflake. - Adds operation details and table links to the Metaflow card when available. +- **Automatically applies ownership object tags to production tables** (see + [Ownership tags](#ownership-tags) below). ## Parameters @@ -33,6 +36,7 @@ publish( | `ctx` | `dict[str, Any] \| None` | No | Optional template substitution context for SQL operations. | | `warehouse` | `Literal["XS", "MED", "XL"] \| None` | No | Snowflake warehouse override for this operation. Supports `XS`/`MED`/`XL` shortcuts or a full warehouse name. | | `use_utc` | `bool` | No | If `True`, uses UTC timezone for Snowflake session. | +| `tags` | `dict[str, str] \| None` | No | Overrides for the ownership object tags applied to the published table. See [Ownership tags](#ownership-tags).| **Returns:** `None` @@ -47,3 +51,40 @@ publish( audits=["SELECT COUNT(*) > 0 FROM PATTERN_DB.{{schema}}.{{table_name}}"], ) ``` + +## Ownership tags + +When publishing to **production**, `publish()` automatically applies the table-ownership +object tags from the table-ownership RFC. The seven tags are: + +| Tag | Source | Always set? | +| --------------- | ------------------------------------------------------- | --------------- | +| `TABLE_OWNER` | Metaflow `current.username` | yes | +| `TABLE_TEAM` | `data-science` | yes | +| `TABLE_DOMAIN` | `ds.domain` Metaflow tag | yes | +| `TABLE_PROJECT` | `ds.project` Metaflow tag | yes | +| `TABLE_STATUS` | `active` (override allows `active`/`deprecated`/`archived`) | yes | +| `TABLE_SLA` | override only (`realtime`/`hourly`/`daily`/`weekly`/`ad_hoc`) | only if given | +| `TABLE_CONTACT` | override only (Slack channel or email) | only if given | + +Pass `tags=` to override any value. Keys may be `owner`/`team`/`domain`/`project`/ +`status`/`sla`/`contact` (optionally `TABLE_`-prefixed): + +```python +publish( + table_name="OUT_OF_STOCK_ADS", + query="sql/create_training_data.sql", + tags={"sla": "daily", "contact": "#ds-recsys", "status": "active"}, +) +``` + +Notes: + +- Tags are applied **only to production tables**. Non-prod (`DATA_SCIENCE_STAGE`) runs + apply no tags. +- The tag *definitions* must first be created once by a Snowflake admin (the RFC + `CREATE TAG` setup). Until then, tagging is **skipped with a warning** — the publish + still succeeds. +- Invalid `status`/`sla` values raise `ValueError` before any data is written. +- Tagged tables surface in the `TABLE_OWNERSHIP_REGISTRY` view (see + `create_ownership_registry_view`). diff --git a/docs/metaflow/publish_pandas.md b/docs/metaflow/publish_pandas.md index b00185d..a502405 100644 --- a/docs/metaflow/publish_pandas.md +++ b/docs/metaflow/publish_pandas.md @@ -22,6 +22,7 @@ publish_pandas( use_utc: bool = True, use_s3_stage: bool = False, table_definition: list[tuple[str, str]] | None = None, + tags: dict[str, str] | None = None, ) -> None ``` @@ -30,6 +31,8 @@ publish_pandas( - Validates DataFrame input. - Writes directly via `write_pandas` or via S3 stage flow for large data. - Adds a Snowflake table URL to Metaflow card output. +- **Automatically applies ownership object tags to production tables** (see + [Ownership tags](#ownership-tags) below). ## Parameters @@ -49,9 +52,30 @@ publish_pandas( | `use_utc` | `bool` | No | If `True`, uses UTC timezone for Snowflake session. | | `use_s3_stage` | `bool` | No | If `True`, publishes via S3 stage flow; otherwise uses direct `write_pandas`. | | `table_definition` | `list[tuple[str, str]] \| None` | No | Optional Snowflake table schema; used by S3 stage flow when table creation is needed. | +| `tags` | `dict[str, str] \| None` | No | Overrides for the ownership object tags applied to the published table. See [Ownership tags](#ownership-tags).| **Returns:** `None` +## Ownership tags + +When publishing to **production**, `publish_pandas()` automatically applies the same +seven table-ownership object tags as [`publish`](publish.md#ownership-tags): +`TABLE_OWNER`, `TABLE_TEAM`, `TABLE_DOMAIN`, `TABLE_PROJECT`, `TABLE_STATUS` and +(when provided via `tags=`) `TABLE_SLA` / `TABLE_CONTACT`. + +```python +publish_pandas( + table_name="MY_TABLE", + df=df, + tags={"sla": "daily", "contact": "#ds-recsys"}, +) +``` + +- Tags are applied **only to production tables**; non-prod runs apply none. +- Tag *definitions* must first be created by a Snowflake admin (RFC `CREATE TAG` setup); + until then tagging is **skipped with a warning** and the publish still succeeds. +- Invalid `status`/`sla` values raise `ValueError` before any data is written. + ## Limitations - When `use_s3_stage=True`, some column data types may not map exactly as expected between pandas/parquet and Snowflake. diff --git a/pyproject.toml b/pyproject.toml index 121b9cd..b2d50ec 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,11 +1,12 @@ [project] name = "ds-platform-utils" -version = "0.4.2" +version = "0.5.0" description = "Utility library for Pattern Data Science." readme = "README.md" authors = [ { name = "Amit Vikram Raj", email = "amit.raj@pattern.com" }, - { name = "Eric Riddoch", email = "eric.riddoch@pattern.com" } + { name = "Eric Riddoch", email = "eric.riddoch@pattern.com" }, + { name = "Vinay Shende", email = "vinay.shende@pattern.com" } ] # requires-python = ">=3.7" dependencies = [ diff --git a/src/ds_platform_utils/_snowflake/object_tags.py b/src/ds_platform_utils/_snowflake/object_tags.py new file mode 100644 index 0000000..e9a1b16 --- /dev/null +++ b/src/ds_platform_utils/_snowflake/object_tags.py @@ -0,0 +1,166 @@ +"""Build and apply Snowflake object tags for table ownership / governance. + +Implements the tag schema from the "Snowflake table ownership via object tags" RFC. +Tags are applied only to production tables, so both the tag *definitions* and the +*tables* live in ``PATTERN_DB.DATA_SCIENCE``. + +The tag *definitions* must be created once by a Snowflake admin (see the RFC's +``CREATE TAG`` setup). Until they exist, :func:`apply_table_tags` warns and leaves the +(already successful) table write untouched -- tagging must never break a publish. +""" + +from typing import TYPE_CHECKING, Dict, Optional + +from ds_platform_utils._snowflake.run_query import _execute_sql +from ds_platform_utils.metaflow._consts import PROD_SCHEMA +from ds_platform_utils.sql_utils import get_select_dev_query_tags + +if TYPE_CHECKING: + from snowflake.connector import SnowflakeConnection + +DATABASE = "PATTERN_DB" + +# RFC allowed-value lists for the constrained tags. +TABLE_STATUS_ALLOWED = {"active", "deprecated", "archived"} +TABLE_SLA_ALLOWED = {"realtime", "hourly", "daily", "weekly", "ad_hoc"} +DEFAULT_TABLE_STATUS = "active" + +# All seven RFC tag names. +TAG_OWNER = "TABLE_OWNER" +TAG_TEAM = "TABLE_TEAM" +TAG_DOMAIN = "TABLE_DOMAIN" +TAG_PROJECT = "TABLE_PROJECT" +TAG_STATUS = "TABLE_STATUS" +TAG_SLA = "TABLE_SLA" +TAG_CONTACT = "TABLE_CONTACT" + +# Maps accepted override keys (case-insensitive, with or without the ``TABLE_`` prefix) +# to the canonical tag name. +_OVERRIDE_ALIASES = { + "owner": TAG_OWNER, + "team": TAG_TEAM, + "domain": TAG_DOMAIN, + "project": TAG_PROJECT, + "status": TAG_STATUS, + "sla": TAG_SLA, + "contact": TAG_CONTACT, +} + + +def _normalize_overrides(tags_override: Optional[Dict[str, str]]) -> Dict[str, str]: + """Normalize caller override keys to canonical tag names. + + Accepts e.g. ``owner``, ``OWNER`` or ``TABLE_OWNER`` -> ``TABLE_OWNER``. + + :param tags_override: Raw override dict supplied by the caller. + :return: Override dict keyed by canonical tag name. + :raises ValueError: If an override key does not map to a known tag. + """ + normalized: Dict[str, str] = {} + for key, value in (tags_override or {}).items(): + canonical = _OVERRIDE_ALIASES.get(key.strip().lower().removeprefix("table_")) + if canonical is None: + raise ValueError( + f"Unknown tag override key {key!r}. Allowed keys: {sorted(_OVERRIDE_ALIASES)} " + f"(optionally prefixed with 'TABLE_')." + ) + normalized[canonical] = value + return normalized + + +def build_table_tags( + tags_override: Optional[Dict[str, str]] = None, + current_obj: Optional[object] = None, +) -> Dict[str, str]: + """Build the final ``{TAG_NAME: value}`` dict to apply to a published table. + + OWNER / TEAM / DOMAIN / PROJECT are derived from the Metaflow run context (reusing + :func:`get_select_dev_query_tags`); STATUS defaults to ``active``. Any value may be + overridden via ``tags_override``. SLA and CONTACT are only included when supplied + via ``tags_override`` (they cannot be inferred). + + :param tags_override: Optional overrides, keyed by ``owner``/``TABLE_OWNER``/etc. + :param current_obj: Optional Metaflow ``current`` stand-in (for testing). + :return: Mapping of canonical tag name to value, ready to apply. + :raises ValueError: If STATUS or SLA is not in its allowed-value list, or an + override key is unknown. + """ + overrides = _normalize_overrides(tags_override) + derived = get_select_dev_query_tags(current_obj=current_obj) + + tags: Dict[str, str] = { + TAG_OWNER: derived["user"], + TAG_TEAM: derived["team"], + TAG_DOMAIN: derived["domain"], + TAG_PROJECT: derived["workload_id"], + TAG_STATUS: DEFAULT_TABLE_STATUS, + } + # SLA / CONTACT are only set when explicitly provided. + tags.update(overrides) + + status = tags[TAG_STATUS] + if status not in TABLE_STATUS_ALLOWED: + raise ValueError(f"TABLE_STATUS must be one of {sorted(TABLE_STATUS_ALLOWED)}, got {status!r}.") + + sla = tags.get(TAG_SLA) + if sla is not None and sla not in TABLE_SLA_ALLOWED: + raise ValueError(f"TABLE_SLA must be one of {sorted(TABLE_SLA_ALLOWED)}, got {sla!r}.") + + # Drop any tags whose value is None/empty so we never emit ``= ''``. + return {name: str(value) for name, value in tags.items() if value is not None and str(value) != ""} + + +def _quote(value: str) -> str: + """Escape a tag value for a single-quoted SQL literal (double embedded quotes).""" + return value.replace("'", "''") + + +def build_set_tag_sql(table_name: str, tags: Dict[str, str], schema: str = PROD_SCHEMA) -> str: + """Build a single ``ALTER TABLE ... SET TAG`` statement. + + Tag definitions and the table both live in ``schema`` (``DATA_SCIENCE`` for prod). + + :param table_name: Table to tag (upper-cased to match Snowflake's stored identifier). + :param tags: Mapping of tag name to value (e.g. from :func:`build_table_tags`). + :param schema: Schema holding both the table and the tag definitions. + :return: The ``ALTER TABLE`` SQL string. + :raises ValueError: If ``tags`` is empty. + """ + if not tags: + raise ValueError("No tags to apply.") + table = table_name.upper() + assignments = ",\n ".join(f"{DATABASE}.{schema}.{name} = '{_quote(value)}'" for name, value in tags.items()) + return f"ALTER TABLE {DATABASE}.{schema}.{table}\n SET TAG\n {assignments};" + + +def apply_table_tags( + conn: "SnowflakeConnection", + table_name: str, + tags: Dict[str, str], + schema: str = PROD_SCHEMA, +) -> None: + """Apply object tags to a published table, warning (never raising) on failure. + + A failure here most commonly means the tag definitions have not yet been created by + an admin (see the RFC ``CREATE TAG`` setup). Because the table write has already + succeeded by this point, we log a clear warning and return rather than breaking the + publish. + + :param conn: Open Snowflake connection. + :param table_name: Table to tag. + :param tags: Mapping of tag name to value. + :param schema: Schema holding both the table and the tag definitions. + """ + if not tags: + return + sql = build_set_tag_sql(table_name=table_name, tags=tags, schema=schema) + try: + _execute_sql(conn, sql) + conn.commit() + print(f"Applied ownership tags to {DATABASE}.{schema}.{table_name.upper()}: {sorted(tags)}") + except Exception as exc: # noqa: BLE001 -- tagging must never break a successful publish + print( + f"Warning: failed to apply ownership tags to {DATABASE}.{schema}.{table_name.upper()} " + f"({exc}). The table was published successfully; tags were skipped. This usually means the " + f"tag definitions have not been created yet by a Snowflake admin (see the table-ownership RFC)." + ) diff --git a/src/ds_platform_utils/metaflow/__init__.py b/src/ds_platform_utils/metaflow/__init__.py index 5a88f4f..403fc2d 100644 --- a/src/ds_platform_utils/metaflow/__init__.py +++ b/src/ds_platform_utils/metaflow/__init__.py @@ -1,11 +1,13 @@ from .batch_inference_pipeline import BatchInferencePipeline from .pandas import publish_pandas, query_pandas_from_snowflake +from .registry import create_ownership_registry_view from .restore_step_state import restore_step_state from .validate_config import make_pydantic_parser_fn from .write_audit_publish import publish __all__ = [ "BatchInferencePipeline", + "create_ownership_registry_view", "make_pydantic_parser_fn", "publish", "publish_pandas", diff --git a/src/ds_platform_utils/metaflow/pandas.py b/src/ds_platform_utils/metaflow/pandas.py index 9fbcd58..9638529 100644 --- a/src/ds_platform_utils/metaflow/pandas.py +++ b/src/ds_platform_utils/metaflow/pandas.py @@ -44,6 +44,7 @@ def publish_pandas( # noqa: PLR0913 (too many arguments) use_utc: bool = True, use_s3_stage: bool = False, table_definition: Optional[List[Tuple[str, str]]] = None, + tags: Optional[Dict[str, str]] = None, ) -> None: """Store a pandas dataframe as a Snowflake table. @@ -89,13 +90,26 @@ def publish_pandas( # noqa: PLR0913 (too many arguments) :param table_definition: Optional list of tuples specifying the column names and types for the Snowflake table. This is only used when `use_s3_stage` is True, and is required in that case. The list should be in the format: `[(col_name1, col_type1), (col_name2, col_type2), ...]`, where `col_type` is a valid Snowflake data type (e.g., 'STRING', 'NUMBER', 'TIMESTAMP_NTZ', etc.). + + :param tags: Optional overrides for the ownership/governance object tags applied to the published + table (see the table-ownership RFC). Keys may be `owner`/`team`/`domain`/`project`/`status`/`sla`/ + `contact` (optionally `TABLE_`-prefixed). OWNER/TEAM/DOMAIN/PROJECT are derived from the Metaflow + run context when not overridden, STATUS defaults to `active`, and SLA/CONTACT are only applied when + provided here. Tags are only applied to **production** tables; in non-prod runs no tags are applied. + If the tag definitions have not yet been created by a Snowflake admin, tagging is skipped with a + warning (the publish still succeeds). """ + from ds_platform_utils._snowflake.object_tags import apply_table_tags, build_table_tags + if not isinstance(df, pd.DataFrame): raise TypeError("df must be a pandas DataFrame.") if df.empty: raise ValueError("DataFrame is empty.") + # Build/validate tags up front so an invalid status/sla fails fast, before any writes. + table_tags = build_table_tags(tags_override=tags) + if add_created_date: df["created_date"] = datetime.now().astimezone(pytz.utc) @@ -136,6 +150,12 @@ def publish_pandas( # noqa: PLR0913 (too many arguments) use_logical_type=use_logical_type, ) + # Tag the published table (prod only). The S3 path has no open connection, so open one. + if current.is_production: + tag_conn: SnowflakeConnection = get_snowflake_connection(warehouse=warehouse, use_utc=use_utc) + apply_table_tags(conn=tag_conn, table_name=table_name, tags=table_tags) + tag_conn.close() + else: conn: SnowflakeConnection = get_snowflake_connection(warehouse=warehouse, use_utc=use_utc) _execute_sql(conn, f"USE SCHEMA PATTERN_DB.{schema};") @@ -154,6 +174,10 @@ def publish_pandas( # noqa: PLR0913 (too many arguments) overwrite=overwrite, use_logical_type=use_logical_type, ) + + # Tag the published table (prod only), reusing the open connection before closing it. + if current.is_production: + apply_table_tags(conn=conn, table_name=table_name, tags=table_tags) conn.close() # Add a link to the table in Snowflake to the card diff --git a/src/ds_platform_utils/metaflow/registry.py b/src/ds_platform_utils/metaflow/registry.py new file mode 100644 index 0000000..a39f4ed --- /dev/null +++ b/src/ds_platform_utils/metaflow/registry.py @@ -0,0 +1,56 @@ +"""Central table-ownership registry view (RFC §6). + +Exposes the ownership tags applied by :func:`publish` / :func:`publish_pandas` as a +single queryable view. The view is *not* materialized -- it is always live at query +time. Its only staleness is the inherent ~2h lag of +``SNOWFLAKE.ACCOUNT_USAGE.TAG_REFERENCES`` (see the RFC risks section); no periodic +refresh is needed. It is adoption-based: only tables that have at least one ownership +tag appear. +""" + +from typing import Optional + +from ds_platform_utils._snowflake.run_query import _execute_sql + +REGISTRY_VIEW_NAME = "PATTERN_DB.DATA_SCIENCE.TABLE_OWNERSHIP_REGISTRY" + +OWNERSHIP_REGISTRY_VIEW_SQL = f""" +CREATE OR REPLACE VIEW {REGISTRY_VIEW_NAME} AS +SELECT + tr.object_name AS table_name, + MAX(CASE WHEN tr.tag_name = 'TABLE_OWNER' THEN tr.tag_value END) AS owner, + MAX(CASE WHEN tr.tag_name = 'TABLE_TEAM' THEN tr.tag_value END) AS team, + MAX(CASE WHEN tr.tag_name = 'TABLE_DOMAIN' THEN tr.tag_value END) AS domain, + MAX(CASE WHEN tr.tag_name = 'TABLE_PROJECT' THEN tr.tag_value END) AS project, + MAX(CASE WHEN tr.tag_name = 'TABLE_STATUS' THEN tr.tag_value END) AS status, + MAX(CASE WHEN tr.tag_name = 'TABLE_SLA' THEN tr.tag_value END) AS sla, + MAX(CASE WHEN tr.tag_name = 'TABLE_CONTACT' THEN tr.tag_value END) AS contact +FROM SNOWFLAKE.ACCOUNT_USAGE.TAG_REFERENCES tr +WHERE tr.object_database = 'PATTERN_DB' + AND tr.object_schema = 'DATA_SCIENCE' + AND tr.domain = 'TABLE' + AND tr.tag_name IN ( + 'TABLE_OWNER', 'TABLE_TEAM', 'TABLE_DOMAIN', 'TABLE_PROJECT', + 'TABLE_STATUS', 'TABLE_SLA', 'TABLE_CONTACT' + ) +GROUP BY tr.object_name; +""" + + +def create_ownership_registry_view(conn: Optional["object"] = None) -> str: + """Create (or replace) the table-ownership registry view. + + Intended as a one-time admin helper. If ``conn`` is omitted, a connection is opened + via :func:`get_snowflake_connection`. + + :param conn: Optional open Snowflake connection. If None, one is created. + :return: The executed ``CREATE OR REPLACE VIEW`` SQL. + """ + if conn is None: + from ds_platform_utils.metaflow.snowflake_connection import get_snowflake_connection + + conn = get_snowflake_connection() + _execute_sql(conn, OWNERSHIP_REGISTRY_VIEW_SQL) + conn.commit() + print(f"Created/replaced view {REGISTRY_VIEW_NAME}.") + return OWNERSHIP_REGISTRY_VIEW_SQL diff --git a/src/ds_platform_utils/metaflow/write_audit_publish.py b/src/ds_platform_utils/metaflow/write_audit_publish.py index 1f1a709..8b20fd0 100644 --- a/src/ds_platform_utils/metaflow/write_audit_publish.py +++ b/src/ds_platform_utils/metaflow/write_audit_publish.py @@ -25,6 +25,7 @@ def publish( # noqa: PLR0913, D417 ctx: Optional[Dict[str, Any]] = None, warehouse: Optional[Union[Literal["XS", "MED", "XL"], str]] = None, use_utc: bool = True, + tags: Optional[Dict[str, str]] = None, ) -> None: """Publish a Snowflake table using the write-audit-publish (WAP) pattern via Metaflow's Snowflake connection. @@ -43,6 +44,13 @@ def publish( # noqa: PLR0913, D417 when running in the Outerbounds **Default** perimeter, and to the `OUTERBOUNDS_DATA_SCIENCE_SHARED_PROD_XS_WH` warehouse, when running in the Outerbounds **PROD** perimeter. :param use_utc: Whether to use UTC timezone for the Snowflake connection (affects timestamp fields). + :param tags: Optional overrides for the ownership/governance object tags applied to the published + table (see the table-ownership RFC). Keys may be ``owner``/``team``/``domain``/``project``/ + ``status``/``sla``/``contact`` (optionally ``TABLE_``-prefixed). OWNER/TEAM/DOMAIN/PROJECT are + derived from the Metaflow run context when not overridden, STATUS defaults to ``active``, and + SLA/CONTACT are only applied when provided here. Tags are only applied to **production** tables; + in non-prod runs no tags are applied. If the tag definitions have not yet been created by a + Snowflake admin, tagging is skipped with a warning (the publish still succeeds). Returns ------- @@ -58,12 +66,17 @@ def publish( # noqa: PLR0913, D417 query="sql/create_training_data.sql", audits=["sql/validate_training_data.sql"], warehouse="OUTERBOUNDS_DATA_SCIENCE_SHARED_DEV_XL_WH", + tags={"sla": "daily", "contact": "#ds-recsys"}, ) ``` """ + from ds_platform_utils._snowflake.object_tags import apply_table_tags, build_table_tags from ds_platform_utils._snowflake.write_audit_publish import write_audit_publish + # Build/validate tags up front so an invalid status/sla fails fast, before any writes. + table_tags = build_table_tags(tags_override=tags) + conn = get_snowflake_connection(warehouse=warehouse, use_utc=use_utc) query = get_query_from_string_or_fpath(query) @@ -86,6 +99,10 @@ def publish( # noqa: PLR0913, D417 ) last_op_was_write = operation.operation_type == "write" + # Tag the final table (prod only). Done after the SWAP so tags land on the live table. + if current.is_production: + apply_table_tags(conn=cur.connection, table_name=table_name, tags=table_tags) + def update_card_with_operation_info( operation: "SQLOperation", diff --git a/tests/unit_tests/snowflake/test__object_tags.py b/tests/unit_tests/snowflake/test__object_tags.py new file mode 100644 index 0000000..d09ad86 --- /dev/null +++ b/tests/unit_tests/snowflake/test__object_tags.py @@ -0,0 +1,159 @@ +import pytest + +from ds_platform_utils._snowflake import object_tags +from ds_platform_utils._snowflake.object_tags import ( + apply_table_tags, + build_set_tag_sql, + build_table_tags, +) + + +class FakeCurrent: + """Stand-in for ``metaflow.current`` used to drive tag derivation in tests.""" + + tags = ["ds.domain:recommendations", "ds.project:two_tower_v2"] + flow_name = "MyFlow" + project_name = "recsys-proj" + step_name = "end" + run_id = "123" + username = "john_doe" + namespace = "user:john" + is_production = True + + +def test_build_table_tags_derives_all_mappings(): + """All four context-derived tags + default STATUS are present; SLA/CONTACT omitted.""" + tags = build_table_tags(current_obj=FakeCurrent()) + + assert tags["TABLE_OWNER"] == "john_doe" + assert tags["TABLE_TEAM"] == "data-science" + assert tags["TABLE_DOMAIN"] == "recommendations" + assert tags["TABLE_PROJECT"] == "two_tower_v2" + assert tags["TABLE_STATUS"] == "active" + assert "TABLE_SLA" not in tags + assert "TABLE_CONTACT" not in tags + + +def test_build_table_tags_overrides_win(): + """Overrides (incl. alias + cased keys) replace derived values and add SLA/CONTACT.""" + tags = build_table_tags( + tags_override={"owner": "jane", "SLA": "daily", "TABLE_CONTACT": "#ds-recsys"}, + current_obj=FakeCurrent(), + ) + + assert tags["TABLE_OWNER"] == "jane" + assert tags["TABLE_SLA"] == "daily" + assert tags["TABLE_CONTACT"] == "#ds-recsys" + # Non-overridden derived values still present. + assert tags["TABLE_DOMAIN"] == "recommendations" + + +@pytest.mark.parametrize("override", [{"status": "bogus"}, {"sla": "every_minute"}]) +def test_build_table_tags_invalid_constrained_value_raises(override): + """Invalid STATUS or SLA values raise ValueError (caller error).""" + with pytest.raises(ValueError): + build_table_tags(tags_override=override, current_obj=FakeCurrent()) + + +def test_build_table_tags_unknown_key_raises(): + """An unrecognized override key raises ValueError.""" + with pytest.raises(ValueError, match="Unknown tag override key"): + build_table_tags(tags_override={"foo": "bar"}, current_obj=FakeCurrent()) + + +def test_build_set_tag_sql_format_and_escaping(): + """SQL targets DATA_SCIENCE for both table and tag, upper-cases the table, escapes quotes.""" + sql = build_set_tag_sql(table_name="my_table", tags={"TABLE_OWNER": "o'brien"}) + + assert "ALTER TABLE PATTERN_DB.DATA_SCIENCE.MY_TABLE" in sql + assert "PATTERN_DB.DATA_SCIENCE.TABLE_OWNER = 'o''brien'" in sql + assert sql.strip().endswith(";") + + +def test_build_set_tag_sql_empty_raises(): + with pytest.raises(ValueError, match="No tags to apply"): + build_set_tag_sql(table_name="t", tags={}) + + +def test_build_set_tag_sql_multiple_tags_joined(): + """Multiple tags are comma-joined under a single SET TAG / single trailing semicolon.""" + sql = build_set_tag_sql( + table_name="my_table", + tags={"TABLE_OWNER": "john_doe", "TABLE_TEAM": "data-science", "TABLE_STATUS": "active"}, + ) + + assert sql.count("SET TAG") == 1 + assert "PATTERN_DB.DATA_SCIENCE.TABLE_OWNER = 'john_doe'," in sql + assert "PATTERN_DB.DATA_SCIENCE.TABLE_TEAM = 'data-science'," in sql + assert "PATTERN_DB.DATA_SCIENCE.TABLE_STATUS = 'active'" in sql + # Exactly one statement terminator, on the last assignment only. + assert sql.count(";") == 1 + assert sql.count("=") == 3 + + +def test_build_table_tags_drops_empty_override_value(): + """An empty-string override is dropped rather than emitted as TAG = ''.""" + tags = build_table_tags(tags_override={"contact": ""}, current_obj=FakeCurrent()) + + assert "TABLE_CONTACT" not in tags + + +class FakeConn: + def __init__(self): + self.committed = False + + def commit(self): + """Record that commit was called.""" + self.committed = True + + +def test_apply_table_tags_swallows_errors_and_warns(monkeypatch, capsys): + """A failure applying tags must not raise and must not break the publish.""" + + def _boom(*_args, **_kwargs): + raise RuntimeError("tag 'TABLE_OWNER' does not exist") + + monkeypatch.setattr(object_tags, "_execute_sql", _boom) + conn = FakeConn() + + apply_table_tags(conn=conn, table_name="my_table", tags={"TABLE_OWNER": "john_doe"}) + + assert conn.committed is False + assert "Warning: failed to apply ownership tags" in capsys.readouterr().out + + +def test_apply_table_tags_success_executes_and_commits(monkeypatch, capsys): + """Happy path: the built SQL is executed against the conn and the change is committed.""" + captured = {} + + def _capture(conn, sql): + captured["conn"] = conn + captured["sql"] = sql + + monkeypatch.setattr(object_tags, "_execute_sql", _capture) + conn = FakeConn() + + apply_table_tags(conn=conn, table_name="my_table", tags={"TABLE_OWNER": "john_doe"}) + + assert captured["conn"] is conn + assert "ALTER TABLE PATTERN_DB.DATA_SCIENCE.MY_TABLE" in captured["sql"] + assert "PATTERN_DB.DATA_SCIENCE.TABLE_OWNER = 'john_doe'" in captured["sql"] + assert conn.committed is True + assert "Applied ownership tags" in capsys.readouterr().out + + +def test_apply_table_tags_noop_on_empty(monkeypatch): + """No tags -> no execution, no commit.""" + called = False + + def _spy(*_args, **_kwargs): + nonlocal called + called = True + + monkeypatch.setattr(object_tags, "_execute_sql", _spy) + conn = FakeConn() + + apply_table_tags(conn=conn, table_name="my_table", tags={}) + + assert called is False + assert conn.committed is False diff --git a/uv.lock b/uv.lock index a099e72..37078e2 100644 --- a/uv.lock +++ b/uv.lock @@ -479,7 +479,7 @@ wheels = [ [[package]] name = "ds-platform-utils" -version = "0.4.2" +version = "0.5.0" source = { editable = "." } dependencies = [ { name = "jinja2" }, From efc68b69653dda1c1306e06fd9b61489e5095f48 Mon Sep 17 00:00:00 2001 From: vinay79n Date: Tue, 16 Jun 2026 16:04:55 +0530 Subject: [PATCH 2/7] feat: enhance tagging functionality with identifier validation and improved error handling , github copilot suggested code chnages --- docs/metaflow/publish.md | 11 +++++- docs/metaflow/publish_pandas.md | 4 ++ .../_snowflake/object_tags.py | 30 ++++++++++++++- src/ds_platform_utils/metaflow/pandas.py | 35 +++++++++++++++-- .../unit_tests/snowflake/test__object_tags.py | 38 +++++++++++++++++++ 5 files changed, 111 insertions(+), 7 deletions(-) diff --git a/docs/metaflow/publish.md b/docs/metaflow/publish.md index e99d146..62c5e7e 100644 --- a/docs/metaflow/publish.md +++ b/docs/metaflow/publish.md @@ -61,12 +61,19 @@ object tags from the table-ownership RFC. The seven tags are: | --------------- | ------------------------------------------------------- | --------------- | | `TABLE_OWNER` | Metaflow `current.username` | yes | | `TABLE_TEAM` | `data-science` | yes | -| `TABLE_DOMAIN` | `ds.domain` Metaflow tag | yes | -| `TABLE_PROJECT` | `ds.project` Metaflow tag | yes | +| `TABLE_DOMAIN` | `ds.domain` Metaflow tag, else `unknown` | yes | +| `TABLE_PROJECT` | `ds.project` Metaflow tag, else `unknown` | yes | | `TABLE_STATUS` | `active` (override allows `active`/`deprecated`/`archived`) | yes | | `TABLE_SLA` | override only (`realtime`/`hourly`/`daily`/`weekly`/`ad_hoc`) | only if given | | `TABLE_CONTACT` | override only (Slack channel or email) | only if given | +> **`TABLE_DOMAIN` / `TABLE_PROJECT` depend on flow tags.** These are read from the +> `ds.domain` / `ds.project` Metaflow tags. If a flow runs without them, the value falls +> back to the literal string `unknown` and a warning is printed (the same warning used +> for select.dev cost tracking). Make sure your flow carries `--tag "ds.domain:..."` and +> `--tag "ds.project:..."` — these are applied automatically in CI and the standard `poe` +> run commands in the monorepo — or pass `tags={"domain": ..., "project": ...}` explicitly. + Pass `tags=` to override any value. Keys may be `owner`/`team`/`domain`/`project`/ `status`/`sla`/`contact` (optionally `TABLE_`-prefixed): diff --git a/docs/metaflow/publish_pandas.md b/docs/metaflow/publish_pandas.md index a502405..e41573e 100644 --- a/docs/metaflow/publish_pandas.md +++ b/docs/metaflow/publish_pandas.md @@ -72,6 +72,10 @@ publish_pandas( ``` - Tags are applied **only to production tables**; non-prod runs apply none. +- `TABLE_DOMAIN` / `TABLE_PROJECT` come from the `ds.domain` / `ds.project` Metaflow tags; + if a flow runs without them they fall back to the literal `unknown` and a warning is + printed. Ensure the flow carries those tags (automatic in CI / standard `poe` commands) + or pass `tags={"domain": ..., "project": ...}`. See [`publish`](publish.md#ownership-tags). - Tag *definitions* must first be created by a Snowflake admin (RFC `CREATE TAG` setup); until then tagging is **skipped with a warning** and the publish still succeeds. - Invalid `status`/`sla` values raise `ValueError` before any data is written. diff --git a/src/ds_platform_utils/_snowflake/object_tags.py b/src/ds_platform_utils/_snowflake/object_tags.py index e9a1b16..c00d1b0 100644 --- a/src/ds_platform_utils/_snowflake/object_tags.py +++ b/src/ds_platform_utils/_snowflake/object_tags.py @@ -9,6 +9,7 @@ (already successful) table write untouched -- tagging must never break a publish. """ +import re from typing import TYPE_CHECKING, Dict, Optional from ds_platform_utils._snowflake.run_query import _execute_sql @@ -20,6 +21,12 @@ DATABASE = "PATTERN_DB" +# A Snowflake unquoted identifier: starts with a letter/underscore, then letters/digits/underscores. +# Identifiers (table name, schema, tag names) are interpolated directly into the SET TAG SQL, so we +# reject anything else to avoid malformed SQL or statement injection. (Tag *values* are safely +# single-quoted + escaped via _quote and are not subject to this check.) +_IDENTIFIER_RE = re.compile(r"^[A-Za-z_][A-Za-z0-9_]*$") + # RFC allowed-value lists for the constrained tags. TABLE_STATUS_ALLOWED = {"active", "deprecated", "archived"} TABLE_SLA_ALLOWED = {"realtime", "hourly", "daily", "weekly", "ad_hoc"} @@ -115,6 +122,20 @@ def _quote(value: str) -> str: return value.replace("'", "''") +def _validate_identifier(value: str, kind: str) -> None: + """Reject anything that isn't a plain unquoted SQL identifier. + + Identifiers are interpolated unquoted into the ``SET TAG`` SQL, so a value containing + e.g. ``;`` or whitespace could produce invalid SQL or statement injection. + + :param value: Identifier to check (table name, schema, or tag name). + :param kind: Human-readable label used in the error message. + :raises ValueError: If ``value`` is not a valid unquoted identifier. + """ + if not _IDENTIFIER_RE.match(value): + raise ValueError(f"Invalid {kind} {value!r}; expected an unquoted identifier (letters/numbers/underscore).") + + def build_set_tag_sql(table_name: str, tags: Dict[str, str], schema: str = PROD_SCHEMA) -> str: """Build a single ``ALTER TABLE ... SET TAG`` statement. @@ -124,11 +145,15 @@ def build_set_tag_sql(table_name: str, tags: Dict[str, str], schema: str = PROD_ :param tags: Mapping of tag name to value (e.g. from :func:`build_table_tags`). :param schema: Schema holding both the table and the tag definitions. :return: The ``ALTER TABLE`` SQL string. - :raises ValueError: If ``tags`` is empty. + :raises ValueError: If ``tags`` is empty, or any identifier (table/schema/tag name) is invalid. """ if not tags: raise ValueError("No tags to apply.") table = table_name.upper() + _validate_identifier(table, "table_name") + _validate_identifier(schema, "schema") + for name in tags: + _validate_identifier(name, "tag name") assignments = ",\n ".join(f"{DATABASE}.{schema}.{name} = '{_quote(value)}'" for name, value in tags.items()) return f"ALTER TABLE {DATABASE}.{schema}.{table}\n SET TAG\n {assignments};" @@ -153,8 +178,9 @@ def apply_table_tags( """ if not tags: return - sql = build_set_tag_sql(table_name=table_name, tags=tags, schema=schema) try: + # Built inside the try so identifier-validation errors warn-and-skip rather than break publish. + sql = build_set_tag_sql(table_name=table_name, tags=tags, schema=schema) _execute_sql(conn, sql) conn.commit() print(f"Applied ownership tags to {DATABASE}.{schema}.{table_name.upper()}: {sorted(tags)}") diff --git a/src/ds_platform_utils/metaflow/pandas.py b/src/ds_platform_utils/metaflow/pandas.py index 9638529..0d79c81 100644 --- a/src/ds_platform_utils/metaflow/pandas.py +++ b/src/ds_platform_utils/metaflow/pandas.py @@ -152,9 +152,9 @@ def publish_pandas( # noqa: PLR0913 (too many arguments) # Tag the published table (prod only). The S3 path has no open connection, so open one. if current.is_production: - tag_conn: SnowflakeConnection = get_snowflake_connection(warehouse=warehouse, use_utc=use_utc) - apply_table_tags(conn=tag_conn, table_name=table_name, tags=table_tags) - tag_conn.close() + _tag_table_with_new_connection( + table_name=table_name, tags=table_tags, schema=schema, warehouse=warehouse, use_utc=use_utc + ) else: conn: SnowflakeConnection = get_snowflake_connection(warehouse=warehouse, use_utc=use_utc) @@ -189,6 +189,35 @@ def publish_pandas( # noqa: PLR0913 (too many arguments) current.card.append(Markdown(f"[View table in Snowflake]({table_url})")) +def _tag_table_with_new_connection( + table_name: str, + tags: Dict[str, str], + schema: str, + warehouse: Optional[Union[Literal["XS", "MED", "XL"], str]], + use_utc: bool, +) -> None: + """Open a short-lived connection and tag an already-published table. + + Used by the S3-stage publish path, which has no open connection. Opening the + connection happens outside ``apply_table_tags``' own error handling, so we guard it + here too: tagging must never break an already-successful publish. + """ + from ds_platform_utils._snowflake.object_tags import apply_table_tags + + tag_conn = None + try: + tag_conn = get_snowflake_connection(warehouse=warehouse, use_utc=use_utc) + apply_table_tags(conn=tag_conn, table_name=table_name, tags=tags) + except Exception as exc: # noqa: BLE001 -- tagging must never break a successful publish + print( + f"Warning: failed to open a Snowflake connection to tag PATTERN_DB.{schema}.{table_name} " + f"({exc}). The table was published successfully; tags were skipped." + ) + finally: + if tag_conn is not None: + tag_conn.close() + + def query_pandas_from_snowflake( query: Union[str, Path], warehouse: Optional[Union[Literal["XS", "MED", "XL"], str]] = None, diff --git a/tests/unit_tests/snowflake/test__object_tags.py b/tests/unit_tests/snowflake/test__object_tags.py index d09ad86..2e40a6a 100644 --- a/tests/unit_tests/snowflake/test__object_tags.py +++ b/tests/unit_tests/snowflake/test__object_tags.py @@ -98,6 +98,25 @@ def test_build_table_tags_drops_empty_override_value(): assert "TABLE_CONTACT" not in tags +@pytest.mark.parametrize("bad_table", ["bad; DROP TABLE x", "has space", "1leading_digit", "", "a-b"]) +def test_build_set_tag_sql_rejects_invalid_table_name(bad_table): + """Non-identifier table names are rejected before reaching SQL.""" + with pytest.raises(ValueError, match="Invalid table_name"): + build_set_tag_sql(table_name=bad_table, tags={"TABLE_OWNER": "john_doe"}) + + +def test_build_set_tag_sql_rejects_invalid_tag_name(): + """Non-identifier tag names are rejected.""" + with pytest.raises(ValueError, match="Invalid tag name"): + build_set_tag_sql(table_name="my_table", tags={"TABLE_OWNER; DROP": "x"}) + + +def test_build_set_tag_sql_rejects_invalid_schema(): + """Non-identifier schema is rejected.""" + with pytest.raises(ValueError, match="Invalid schema"): + build_set_tag_sql(table_name="my_table", tags={"TABLE_OWNER": "x"}, schema="DATA_SCIENCE; DROP") + + class FakeConn: def __init__(self): self.committed = False @@ -122,6 +141,25 @@ def _boom(*_args, **_kwargs): assert "Warning: failed to apply ownership tags" in capsys.readouterr().out +def test_apply_table_tags_invalid_identifier_warns_not_raises(monkeypatch, capsys): + """An invalid identifier must warn-and-skip, not propagate out of apply_table_tags.""" + executed = False + + def _spy(*_args, **_kwargs): + nonlocal executed + executed = True + + monkeypatch.setattr(object_tags, "_execute_sql", _spy) + conn = FakeConn() + + # A malformed table name would otherwise build invalid/injectable SQL. + apply_table_tags(conn=conn, table_name="bad; DROP TABLE x", tags={"TABLE_OWNER": "john_doe"}) + + assert executed is False # never reached execution + assert conn.committed is False + assert "Warning: failed to apply ownership tags" in capsys.readouterr().out + + def test_apply_table_tags_success_executes_and_commits(monkeypatch, capsys): """Happy path: the built SQL is executed against the conn and the change is committed.""" captured = {} From 069decb8ad9589eb07b5c7b6acdf0b986f59837b Mon Sep 17 00:00:00 2001 From: vinay79n Date: Wed, 17 Jun 2026 13:50:30 +0530 Subject: [PATCH 3/7] feat: expand allowed values for TABLE_STATUS and TABLE_SLA tags in object tagging --- docs/metaflow/publish.md | 4 ++-- src/ds_platform_utils/_snowflake/object_tags.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/docs/metaflow/publish.md b/docs/metaflow/publish.md index 62c5e7e..89e766f 100644 --- a/docs/metaflow/publish.md +++ b/docs/metaflow/publish.md @@ -63,8 +63,8 @@ object tags from the table-ownership RFC. The seven tags are: | `TABLE_TEAM` | `data-science` | yes | | `TABLE_DOMAIN` | `ds.domain` Metaflow tag, else `unknown` | yes | | `TABLE_PROJECT` | `ds.project` Metaflow tag, else `unknown` | yes | -| `TABLE_STATUS` | `active` (override allows `active`/`deprecated`/`archived`) | yes | -| `TABLE_SLA` | override only (`realtime`/`hourly`/`daily`/`weekly`/`ad_hoc`) | only if given | +| `TABLE_STATUS` | `active` (override allows `active`/`development`/`testing`/`deprecated`/`archived`/`retired`) | yes | +| `TABLE_SLA` | override only (`streaming`/`realtime`/`hourly`/`daily`/`weekly`/`monthly`/`quarterly`/`ad_hoc`/`on_demand`) | only if given | | `TABLE_CONTACT` | override only (Slack channel or email) | only if given | > **`TABLE_DOMAIN` / `TABLE_PROJECT` depend on flow tags.** These are read from the diff --git a/src/ds_platform_utils/_snowflake/object_tags.py b/src/ds_platform_utils/_snowflake/object_tags.py index c00d1b0..02beb10 100644 --- a/src/ds_platform_utils/_snowflake/object_tags.py +++ b/src/ds_platform_utils/_snowflake/object_tags.py @@ -28,8 +28,8 @@ _IDENTIFIER_RE = re.compile(r"^[A-Za-z_][A-Za-z0-9_]*$") # RFC allowed-value lists for the constrained tags. -TABLE_STATUS_ALLOWED = {"active", "deprecated", "archived"} -TABLE_SLA_ALLOWED = {"realtime", "hourly", "daily", "weekly", "ad_hoc"} +TABLE_STATUS_ALLOWED = {"active", "development", "testing", "deprecated", "archived", "retired"} +TABLE_SLA_ALLOWED = {"streaming", "realtime", "hourly", "daily", "weekly", "monthly", "quarterly", "ad_hoc", "on_demand"} DEFAULT_TABLE_STATUS = "active" # All seven RFC tag names. From fbc084cf696b12b595319cc1b2a85da48723f757 Mon Sep 17 00:00:00 2001 From: vinay79n Date: Wed, 17 Jun 2026 14:01:40 +0530 Subject: [PATCH 4/7] lint fix --- src/ds_platform_utils/_snowflake/object_tags.py | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/src/ds_platform_utils/_snowflake/object_tags.py b/src/ds_platform_utils/_snowflake/object_tags.py index 02beb10..f575306 100644 --- a/src/ds_platform_utils/_snowflake/object_tags.py +++ b/src/ds_platform_utils/_snowflake/object_tags.py @@ -28,8 +28,18 @@ _IDENTIFIER_RE = re.compile(r"^[A-Za-z_][A-Za-z0-9_]*$") # RFC allowed-value lists for the constrained tags. -TABLE_STATUS_ALLOWED = {"active", "development", "testing", "deprecated", "archived", "retired"} -TABLE_SLA_ALLOWED = {"streaming", "realtime", "hourly", "daily", "weekly", "monthly", "quarterly", "ad_hoc", "on_demand"} +TABLE_STATUS_ALLOWED = {"active", "development", "testing", "deprecated", "archived", "retired"} +TABLE_SLA_ALLOWED = { + "streaming", + "realtime", + "hourly", + "daily", + "weekly", + "monthly", + "quarterly", + "ad_hoc", + "on_demand", +} DEFAULT_TABLE_STATUS = "active" # All seven RFC tag names. From e00dcf03c3de8fde83d11d9ee5c7aaee6a92c14e Mon Sep 17 00:00:00 2001 From: vinay79n Date: Mon, 22 Jun 2026 16:49:18 +0530 Subject: [PATCH 5/7] feat: tag all published tables (prod + dev) with co-located ownership tags Owner now derives from domain (ds--team); tags co-locate with the table's schema; adds a functional test for the tagging path. --- docs/metaflow/publish.md | 29 ++- docs/metaflow/publish_pandas.md | 8 +- .../_snowflake/object_tags.py | 59 ++++-- src/ds_platform_utils/metaflow/pandas.py | 23 ++- .../metaflow/write_audit_publish.py | 16 +- .../metaflow/test__publish_tagging.py | 169 ++++++++++++++++++ .../unit_tests/snowflake/test__object_tags.py | 66 +++++-- 7 files changed, 312 insertions(+), 58 deletions(-) create mode 100644 tests/functional_tests/metaflow/test__publish_tagging.py diff --git a/docs/metaflow/publish.md b/docs/metaflow/publish.md index 89e766f..4ac2205 100644 --- a/docs/metaflow/publish.md +++ b/docs/metaflow/publish.md @@ -23,7 +23,7 @@ publish( - Reads SQL from a string or `.sql` path. - Runs write/audit/publish operations through Snowflake. - Adds operation details and table links to the Metaflow card when available. -- **Automatically applies ownership object tags to production tables** (see +- **Automatically applies ownership object tags to every published table** (see [Ownership tags](#ownership-tags) below). ## Parameters @@ -54,12 +54,12 @@ publish( ## Ownership tags -When publishing to **production**, `publish()` automatically applies the table-ownership -object tags from the table-ownership RFC. The seven tags are: +On **every** publish, `publish()` automatically applies the table-ownership object tags +from the table-ownership RFC. The seven tags are: | Tag | Source | Always set? | | --------------- | ------------------------------------------------------- | --------------- | -| `TABLE_OWNER` | Metaflow `current.username` | yes | +| `TABLE_OWNER` | owning-team alias derived from the domain (`ds--team`), else `unknown` | yes | | `TABLE_TEAM` | `data-science` | yes | | `TABLE_DOMAIN` | `ds.domain` Metaflow tag, else `unknown` | yes | | `TABLE_PROJECT` | `ds.project` Metaflow tag, else `unknown` | yes | @@ -67,12 +67,21 @@ object tags from the table-ownership RFC. The seven tags are: | `TABLE_SLA` | override only (`streaming`/`realtime`/`hourly`/`daily`/`weekly`/`monthly`/`quarterly`/`ad_hoc`/`on_demand`) | only if given | | `TABLE_CONTACT` | override only (Slack channel or email) | only if given | +> **`TABLE_OWNER` is derived from the domain, not the run user.** Owner is resolved by +> priority: (1) an explicit `tags={"owner": ...}` override, else (2) the owning-team alias +> `ds--team` when the domain is known (e.g. domain `advertising` → +> `ds-advertising-team`), else (3) `unknown`. We don't use `current.username`, because on +> deployed/argo runs it resolves to a service identity (`argo-workflows`) rather than a +> person. Pass `tags={"owner": ...}` to set a specific individual or alias. + > **`TABLE_DOMAIN` / `TABLE_PROJECT` depend on flow tags.** These are read from the > `ds.domain` / `ds.project` Metaflow tags. If a flow runs without them, the value falls > back to the literal string `unknown` and a warning is printed (the same warning used > for select.dev cost tracking). Make sure your flow carries `--tag "ds.domain:..."` and > `--tag "ds.project:..."` — these are applied automatically in CI and the standard `poe` > run commands in the monorepo — or pass `tags={"domain": ..., "project": ...}` explicitly. +> Note: because owner is derived from the domain, a missing domain also means +> `TABLE_OWNER` falls back to `unknown`. Pass `tags=` to override any value. Keys may be `owner`/`team`/`domain`/`project`/ `status`/`sla`/`contact` (optionally `TABLE_`-prefixed): @@ -87,10 +96,14 @@ publish( Notes: -- Tags are applied **only to production tables**. Non-prod (`DATA_SCIENCE_STAGE`) runs - apply no tags. -- The tag *definitions* must first be created once by a Snowflake admin (the RFC - `CREATE TAG` setup). Until then, tagging is **skipped with a warning** — the publish +- Tags are applied to **every** published table — prod tables in `DATA_SCIENCE` and + dev/stage tables in `DATA_SCIENCE_STAGE`. Tags are **co-located with the table's schema**: + a prod table references the tag definitions in `DATA_SCIENCE`, a dev table references + those in `DATA_SCIENCE_STAGE`. So the definitions must exist in **each** schema, and the + publishing role needs `APPLY` on the tags **in that schema** (e.g. the dev role needs + `APPLY` on the `DATA_SCIENCE_STAGE` tags). +- The tag *definitions* must first be created once by a Snowflake admin in each schema + (the RFC `CREATE TAG` setup). Until then, tagging is **skipped with a warning** — the publish still succeeds. - Invalid `status`/`sla` values raise `ValueError` before any data is written. - Tagged tables surface in the `TABLE_OWNERSHIP_REGISTRY` view (see diff --git a/docs/metaflow/publish_pandas.md b/docs/metaflow/publish_pandas.md index e41573e..0892978 100644 --- a/docs/metaflow/publish_pandas.md +++ b/docs/metaflow/publish_pandas.md @@ -31,7 +31,7 @@ publish_pandas( - Validates DataFrame input. - Writes directly via `write_pandas` or via S3 stage flow for large data. - Adds a Snowflake table URL to Metaflow card output. -- **Automatically applies ownership object tags to production tables** (see +- **Automatically applies ownership object tags to every published table** (see [Ownership tags](#ownership-tags) below). ## Parameters @@ -58,7 +58,7 @@ publish_pandas( ## Ownership tags -When publishing to **production**, `publish_pandas()` automatically applies the same +On **every** publish, `publish_pandas()` automatically applies the same seven table-ownership object tags as [`publish`](publish.md#ownership-tags): `TABLE_OWNER`, `TABLE_TEAM`, `TABLE_DOMAIN`, `TABLE_PROJECT`, `TABLE_STATUS` and (when provided via `tags=`) `TABLE_SLA` / `TABLE_CONTACT`. @@ -71,7 +71,9 @@ publish_pandas( ) ``` -- Tags are applied **only to production tables**; non-prod runs apply none. +- Tags are applied to **every** published table — prod in `DATA_SCIENCE`, dev/stage in + `DATA_SCIENCE_STAGE`. Tags are co-located with the table's schema, so the definitions + must exist in **each** schema and the publishing role needs `APPLY` on the tags there. - `TABLE_DOMAIN` / `TABLE_PROJECT` come from the `ds.domain` / `ds.project` Metaflow tags; if a flow runs without them they fall back to the literal `unknown` and a warning is printed. Ensure the flow carries those tags (automatic in CI / standard `poe` commands) diff --git a/src/ds_platform_utils/_snowflake/object_tags.py b/src/ds_platform_utils/_snowflake/object_tags.py index f575306..14186ed 100644 --- a/src/ds_platform_utils/_snowflake/object_tags.py +++ b/src/ds_platform_utils/_snowflake/object_tags.py @@ -13,7 +13,6 @@ from typing import TYPE_CHECKING, Dict, Optional from ds_platform_utils._snowflake.run_query import _execute_sql -from ds_platform_utils.metaflow._consts import PROD_SCHEMA from ds_platform_utils.sql_utils import get_select_dev_query_tags if TYPE_CHECKING: @@ -42,6 +41,15 @@ } DEFAULT_TABLE_STATUS = "active" +# Value used by get_select_dev_query_tags when a derived field can't be resolved. +UNKNOWN_VALUE = "unknown" + + +def _owner_from_domain(domain: str) -> str: + """Map a domain to its owning team alias, e.g. ``advertising`` -> ``ds-advertising-team``.""" + return f"ds-{domain}-team" + + # All seven RFC tag names. TAG_OWNER = "TABLE_OWNER" TAG_TEAM = "TABLE_TEAM" @@ -91,10 +99,13 @@ def build_table_tags( ) -> Dict[str, str]: """Build the final ``{TAG_NAME: value}`` dict to apply to a published table. - OWNER / TEAM / DOMAIN / PROJECT are derived from the Metaflow run context (reusing - :func:`get_select_dev_query_tags`); STATUS defaults to ``active``. Any value may be - overridden via ``tags_override``. SLA and CONTACT are only included when supplied - via ``tags_override`` (they cannot be inferred). + TEAM / DOMAIN / PROJECT are derived from the Metaflow run context (reusing + :func:`get_select_dev_query_tags`); STATUS defaults to ``active``. OWNER is resolved + by priority: (1) an explicit ``owner`` override, else (2) the owning-team alias derived + from the (possibly overridden) domain -- ``ds--team`` -- when the domain is + known, else (3) ``unknown``. (We deliberately don't use ``current.username`` for OWNER: + on deployed/argo runs it resolves to a service identity, not a person.) SLA and CONTACT + are only included when supplied via ``tags_override``. :param tags_override: Optional overrides, keyed by ``owner``/``TABLE_OWNER``/etc. :param current_obj: Optional Metaflow ``current`` stand-in (for testing). @@ -106,7 +117,6 @@ def build_table_tags( derived = get_select_dev_query_tags(current_obj=current_obj) tags: Dict[str, str] = { - TAG_OWNER: derived["user"], TAG_TEAM: derived["team"], TAG_DOMAIN: derived["domain"], TAG_PROJECT: derived["workload_id"], @@ -115,6 +125,15 @@ def build_table_tags( # SLA / CONTACT are only set when explicitly provided. tags.update(overrides) + # Resolve OWNER: explicit override wins; else derive a team alias from the (final) domain + # when it's known; else fall back to "unknown". + if TAG_OWNER not in overrides: + domain = tags.get(TAG_DOMAIN) + if domain and domain != UNKNOWN_VALUE: + tags[TAG_OWNER] = _owner_from_domain(domain) + else: + tags[TAG_OWNER] = UNKNOWN_VALUE + status = tags[TAG_STATUS] if status not in TABLE_STATUS_ALLOWED: raise ValueError(f"TABLE_STATUS must be one of {sorted(TABLE_STATUS_ALLOWED)}, got {status!r}.") @@ -146,10 +165,12 @@ def _validate_identifier(value: str, kind: str) -> None: raise ValueError(f"Invalid {kind} {value!r}; expected an unquoted identifier (letters/numbers/underscore).") -def build_set_tag_sql(table_name: str, tags: Dict[str, str], schema: str = PROD_SCHEMA) -> str: +def build_set_tag_sql(table_name: str, tags: Dict[str, str], schema: str) -> str: """Build a single ``ALTER TABLE ... SET TAG`` statement. - Tag definitions and the table both live in ``schema`` (``DATA_SCIENCE`` for prod). + Tags are co-located with the table they describe: the table and its tag *definitions* + both live in ``schema`` -- ``DATA_SCIENCE`` for prod tables, ``DATA_SCIENCE_STAGE`` for + dev/stage tables (the definitions must exist in each schema). :param table_name: Table to tag (upper-cased to match Snowflake's stored identifier). :param tags: Mapping of tag name to value (e.g. from :func:`build_table_tags`). @@ -172,31 +193,33 @@ def apply_table_tags( conn: "SnowflakeConnection", table_name: str, tags: Dict[str, str], - schema: str = PROD_SCHEMA, + schema: str, ) -> None: """Apply object tags to a published table, warning (never raising) on failure. - A failure here most commonly means the tag definitions have not yet been created by - an admin (see the RFC ``CREATE TAG`` setup). Because the table write has already - succeeded by this point, we log a clear warning and return rather than breaking the - publish. + A failure here most commonly means the tag definitions have not yet been created in + ``schema`` by an admin (see the RFC ``CREATE TAG`` setup), or the publishing role lacks + ``APPLY`` on the tags. Because the table write has already succeeded by this point, we + log a clear warning and return rather than breaking the publish. :param conn: Open Snowflake connection. :param table_name: Table to tag. :param tags: Mapping of tag name to value. - :param schema: Schema holding both the table and the tag definitions. + :param schema: Schema holding both the table and its tag definitions (prod or dev/stage). """ if not tags: return + target = f"{DATABASE}.{schema}.{table_name.upper()}" try: # Built inside the try so identifier-validation errors warn-and-skip rather than break publish. sql = build_set_tag_sql(table_name=table_name, tags=tags, schema=schema) _execute_sql(conn, sql) conn.commit() - print(f"Applied ownership tags to {DATABASE}.{schema}.{table_name.upper()}: {sorted(tags)}") + print(f"Applied ownership tags to {target}: {sorted(tags)}") except Exception as exc: # noqa: BLE001 -- tagging must never break a successful publish print( - f"Warning: failed to apply ownership tags to {DATABASE}.{schema}.{table_name.upper()} " - f"({exc}). The table was published successfully; tags were skipped. This usually means the " - f"tag definitions have not been created yet by a Snowflake admin (see the table-ownership RFC)." + f"Warning: failed to apply ownership tags to {target} ({exc}). The table was published " + f"successfully; tags were skipped. This usually means the tag definitions have not been " + f"created yet by a Snowflake admin, or the publishing role lacks APPLY on the tags " + f"(see the table-ownership RFC)." ) diff --git a/src/ds_platform_utils/metaflow/pandas.py b/src/ds_platform_utils/metaflow/pandas.py index 0d79c81..512466d 100644 --- a/src/ds_platform_utils/metaflow/pandas.py +++ b/src/ds_platform_utils/metaflow/pandas.py @@ -93,9 +93,10 @@ def publish_pandas( # noqa: PLR0913 (too many arguments) :param tags: Optional overrides for the ownership/governance object tags applied to the published table (see the table-ownership RFC). Keys may be `owner`/`team`/`domain`/`project`/`status`/`sla`/ - `contact` (optionally `TABLE_`-prefixed). OWNER/TEAM/DOMAIN/PROJECT are derived from the Metaflow - run context when not overridden, STATUS defaults to `active`, and SLA/CONTACT are only applied when - provided here. Tags are only applied to **production** tables; in non-prod runs no tags are applied. + `contact` (optionally `TABLE_`-prefixed). TEAM/DOMAIN/PROJECT are derived from the Metaflow + run context when not overridden; OWNER defaults to the owning-team alias `ds--team` + (or `unknown` if the domain is unknown); STATUS defaults to `active`; SLA/CONTACT are only applied + when provided here. Tags are only applied to **production** tables; in non-prod runs no tags are applied. If the tag definitions have not yet been created by a Snowflake admin, tagging is skipped with a warning (the publish still succeeds). """ @@ -150,11 +151,10 @@ def publish_pandas( # noqa: PLR0913 (too many arguments) use_logical_type=use_logical_type, ) - # Tag the published table (prod only). The S3 path has no open connection, so open one. - if current.is_production: - _tag_table_with_new_connection( - table_name=table_name, tags=table_tags, schema=schema, warehouse=warehouse, use_utc=use_utc - ) + # Tag the published table. The S3 path has no open connection, so open one. + _tag_table_with_new_connection( + table_name=table_name, tags=table_tags, schema=schema, warehouse=warehouse, use_utc=use_utc + ) else: conn: SnowflakeConnection = get_snowflake_connection(warehouse=warehouse, use_utc=use_utc) @@ -175,9 +175,8 @@ def publish_pandas( # noqa: PLR0913 (too many arguments) use_logical_type=use_logical_type, ) - # Tag the published table (prod only), reusing the open connection before closing it. - if current.is_production: - apply_table_tags(conn=conn, table_name=table_name, tags=table_tags) + # Tag the published table, reusing the open connection before closing it. + apply_table_tags(conn=conn, table_name=table_name, tags=table_tags, schema=schema) conn.close() # Add a link to the table in Snowflake to the card @@ -207,7 +206,7 @@ def _tag_table_with_new_connection( tag_conn = None try: tag_conn = get_snowflake_connection(warehouse=warehouse, use_utc=use_utc) - apply_table_tags(conn=tag_conn, table_name=table_name, tags=tags) + apply_table_tags(conn=tag_conn, table_name=table_name, tags=tags, schema=schema) except Exception as exc: # noqa: BLE001 -- tagging must never break a successful publish print( f"Warning: failed to open a Snowflake connection to tag PATTERN_DB.{schema}.{table_name} " diff --git a/src/ds_platform_utils/metaflow/write_audit_publish.py b/src/ds_platform_utils/metaflow/write_audit_publish.py index 8b20fd0..7b3d119 100644 --- a/src/ds_platform_utils/metaflow/write_audit_publish.py +++ b/src/ds_platform_utils/metaflow/write_audit_publish.py @@ -7,6 +7,7 @@ from snowflake.connector.cursor import SnowflakeCursor from ds_platform_utils._snowflake.run_query import _execute_sql +from ds_platform_utils.metaflow._consts import DEV_SCHEMA, PROD_SCHEMA from ds_platform_utils.metaflow.snowflake_connection import get_snowflake_connection from ds_platform_utils.sql_utils import get_query_from_string_or_fpath @@ -46,9 +47,10 @@ def publish( # noqa: PLR0913, D417 :param use_utc: Whether to use UTC timezone for the Snowflake connection (affects timestamp fields). :param tags: Optional overrides for the ownership/governance object tags applied to the published table (see the table-ownership RFC). Keys may be ``owner``/``team``/``domain``/``project``/ - ``status``/``sla``/``contact`` (optionally ``TABLE_``-prefixed). OWNER/TEAM/DOMAIN/PROJECT are - derived from the Metaflow run context when not overridden, STATUS defaults to ``active``, and - SLA/CONTACT are only applied when provided here. Tags are only applied to **production** tables; + ``status``/``sla``/``contact`` (optionally ``TABLE_``-prefixed). TEAM/DOMAIN/PROJECT are + derived from the Metaflow run context when not overridden; OWNER defaults to the owning-team + alias ``ds--team`` (or ``unknown`` if the domain is unknown); STATUS defaults to + ``active``; SLA/CONTACT are only applied when provided here. Tags are only applied to **production** tables; in non-prod runs no tags are applied. If the tag definitions have not yet been created by a Snowflake admin, tagging is skipped with a warning (the publish still succeeds). @@ -99,9 +101,11 @@ def publish( # noqa: PLR0913, D417 ) last_op_was_write = operation.operation_type == "write" - # Tag the final table (prod only). Done after the SWAP so tags land on the live table. - if current.is_production: - apply_table_tags(conn=cur.connection, table_name=table_name, tags=table_tags) + # Tag the final table. Done after the SWAP so tags land on the live table. Applied to every + # table; the table lives in DATA_SCIENCE (prod) or DATA_SCIENCE_STAGE (dev), while the tag + # definitions always live in DATA_SCIENCE (the apply_table_tags default tag_schema). + schema = PROD_SCHEMA if current.is_production else DEV_SCHEMA + apply_table_tags(conn=cur.connection, table_name=table_name, tags=table_tags, schema=schema) def update_card_with_operation_info( diff --git a/tests/functional_tests/metaflow/test__publish_tagging.py b/tests/functional_tests/metaflow/test__publish_tagging.py new file mode 100644 index 0000000..dce713f --- /dev/null +++ b/tests/functional_tests/metaflow/test__publish_tagging.py @@ -0,0 +1,169 @@ +"""Functional test: ownership object-tagging applied by publish() / publish_pandas(). + +Publishes a pandas DataFrame and a WAP table, each with a ``tags=`` override, then verifies +the ownership tags landed using the real-time ``INFORMATION_SCHEMA.TAG_REFERENCES`` function. + +This runs as a normal (non-production) flow, so the tables land in ``DATA_SCIENCE_STAGE`` and +tagging happens there. The tag *definitions* live in ``DATA_SCIENCE`` -- Snowflake applies a +``DATA_SCIENCE``-defined tag to a ``DATA_SCIENCE_STAGE`` object fine. + +Preconditions (slow test against a live Snowflake account): + * A role that can write ``DATA_SCIENCE_STAGE`` and has ``APPLY`` on the seven ``TABLE_*`` tags. + * The tag definitions already exist (RFC §3 admin setup). Without them (or without APPLY), + tagging is skipped-with-a-warning and the assertions below fail -- the intended signal that + setup is incomplete. +""" + +import subprocess +import sys + +import pytest +from metaflow import FlowSpec, project, step + +# These flows run non-prod (--environment=local run), so tables land in the stage schema. +TABLE_SCHEMA = "DATA_SCIENCE_STAGE" + +PANDAS_TABLE = "DS_PLATFORM_UTILS_TAGGING_TEST_PUBLISH_PANDAS" +WAP_TABLE = "DS_PLATFORM_UTILS_TAGGING_TEST_PUBLISH" + +# publish_pandas: no owner override -> owner is derived from the domain (ds--team). +PANDAS_TAGS = {"sla": "daily", "contact": "mlplatformteam@pattern.com"} +PANDAS_EXPECTED = { + "TABLE_OWNER": "ds-ml-platform-team", # derived from ds.domain:ml-platform + "TABLE_TEAM": "data-science", + "TABLE_DOMAIN": "ml-platform", + "TABLE_PROJECT": "ds-platform-utils-tests", + "TABLE_STATUS": "active", + "TABLE_SLA": "daily", + "TABLE_CONTACT": "mlplatformteam@pattern.com", +} + +# publish: explicit owner override -> the override wins over the domain derivation. +WAP_TAGS = {"owner": "mlplatform_team", "sla": "daily", "contact": "mlplatformteam@pattern.com"} +WAP_EXPECTED = {**PANDAS_EXPECTED, "TABLE_OWNER": "mlplatform_team"} + + +@project(name="ds_platform_utils_tests") +class TestPublishTaggingFlow(FlowSpec): + """Publish two tables with ownership tags, then verify the tags were applied.""" + + @step + def start(self): + """Publish a pandas DataFrame with a tags override (owner derived from domain).""" + import pandas as pd + + from ds_platform_utils.metaflow import publish_pandas + + df = pd.DataFrame( + { + "id": [1, 2, 3, 4, 5], + "name": ["Mario", "Luigi", "Peach", "Bowser", "Toad"], + "score": [90.5, 85.2, 88.7, 92.1, 78.9], + } + ) + + publish_pandas( + table_name=PANDAS_TABLE, + df=df, + auto_create_table=True, + overwrite=True, + tags=PANDAS_TAGS, + ) + + self.next(self.publish_step) + + @step + def publish_step(self): + """Publish a WAP table with an explicit owner override.""" + from ds_platform_utils.metaflow import publish + + query = """ + CREATE OR REPLACE TABLE PATTERN_DB.{{schema}}.{{table_name}} ( + id INT, + name STRING + ); + + INSERT INTO PATTERN_DB.{{schema}}.{{table_name}} (id, name) + VALUES (1, 'Mario'), (2, 'Luigi'), (3, 'Peach'); + """ + + publish( + table_name=WAP_TABLE, + query=query, + tags=WAP_TAGS, + ) + + self.next(self.verify_tags) + + @step + def verify_tags(self): + """Read tags back via the real-time INFORMATION_SCHEMA.TAG_REFERENCES function.""" + for table_name, expected in ((PANDAS_TABLE, PANDAS_EXPECTED), (WAP_TABLE, WAP_EXPECTED)): + actual = _fetch_tags(table_name) + for tag, expected_value in expected.items(): + assert actual.get(tag) == expected_value, ( + f"{table_name}: expected {tag}={expected_value!r}, got {actual.get(tag)!r}. All tags: {actual}" + ) + + self.next(self.end) + + @step + def end(self): + """End the flow.""" + pass + + +def _fetch_tags(table_name: str) -> dict: + """Return ``{TAG_NAME: TAG_VALUE}`` for a stage table using the real-time tag-references function.""" + from ds_platform_utils.metaflow import query_pandas_from_snowflake + + query = f""" + SELECT tag_name, tag_value + FROM TABLE( + INFORMATION_SCHEMA.TAG_REFERENCES('PATTERN_DB.{TABLE_SCHEMA}.{table_name}', 'TABLE') + ); + """ + df = query_pandas_from_snowflake(query) + # query_pandas_from_snowflake lower-cases column names. + return {row.tag_name: row.tag_value for row in df.itertuples()} + + +if __name__ == "__main__": + TestPublishTaggingFlow() + + +@pytest.mark.slow +def test_publish_tagging_flow(): + """Run the flow and assert the ownership tags are applied and readable.""" + cmd = [ + sys.executable, + __file__, + "--environment=local", + "--with=card", + "run", + "--tag=ds.domain:ml-platform", + "--tag=ds.project:ds-platform-utils-tests", + ] + + print("\n=== Metaflow Output ===") + for line in execute_with_output(cmd): + print(line, end="") + + +def execute_with_output(cmd): + """Execute a command and yield output lines as they are produced.""" + process = subprocess.Popen( + cmd, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, # Merge stderr into stdout + universal_newlines=True, + bufsize=1, + ) + + for line in iter(process.stdout.readline, ""): + yield line + + process.stdout.close() + return_code = process.wait() + if return_code: + raise subprocess.CalledProcessError(return_code, cmd) diff --git a/tests/unit_tests/snowflake/test__object_tags.py b/tests/unit_tests/snowflake/test__object_tags.py index 2e40a6a..dc8e63a 100644 --- a/tests/unit_tests/snowflake/test__object_tags.py +++ b/tests/unit_tests/snowflake/test__object_tags.py @@ -22,10 +22,11 @@ class FakeCurrent: def test_build_table_tags_derives_all_mappings(): - """All four context-derived tags + default STATUS are present; SLA/CONTACT omitted.""" + """Context-derived tags + default STATUS present; OWNER = team alias from domain; SLA/CONTACT omitted.""" tags = build_table_tags(current_obj=FakeCurrent()) - assert tags["TABLE_OWNER"] == "john_doe" + # OWNER is derived from the domain (not current.username) when no override is given. + assert tags["TABLE_OWNER"] == "ds-recommendations-team" assert tags["TABLE_TEAM"] == "data-science" assert tags["TABLE_DOMAIN"] == "recommendations" assert tags["TABLE_PROJECT"] == "two_tower_v2" @@ -34,6 +35,33 @@ def test_build_table_tags_derives_all_mappings(): assert "TABLE_CONTACT" not in tags +def test_build_table_tags_owner_falls_back_to_unknown_without_domain(): + """When the domain can't be resolved, OWNER falls back to 'unknown'.""" + + class NoDomainCurrent(FakeCurrent): + tags = [] # no ds.domain / ds.project -> domain resolves to "unknown" + + tags = build_table_tags(current_obj=NoDomainCurrent()) + + assert tags["TABLE_DOMAIN"] == "unknown" + assert tags["TABLE_OWNER"] == "unknown" + + +def test_build_table_tags_owner_follows_overridden_domain(): + """Overriding the domain (without overriding owner) re-derives the owner team alias.""" + tags = build_table_tags(tags_override={"domain": "advertising"}, current_obj=FakeCurrent()) + + assert tags["TABLE_DOMAIN"] == "advertising" + assert tags["TABLE_OWNER"] == "ds-advertising-team" + + +def test_build_table_tags_explicit_owner_beats_domain_derivation(): + """An explicit owner override wins over the domain-derived team alias.""" + tags = build_table_tags(tags_override={"owner": "jane"}, current_obj=FakeCurrent()) + + assert tags["TABLE_OWNER"] == "jane" + + def test_build_table_tags_overrides_win(): """Overrides (incl. alias + cased keys) replace derived values and add SLA/CONTACT.""" tags = build_table_tags( @@ -62,17 +90,30 @@ def test_build_table_tags_unknown_key_raises(): def test_build_set_tag_sql_format_and_escaping(): - """SQL targets DATA_SCIENCE for both table and tag, upper-cases the table, escapes quotes.""" - sql = build_set_tag_sql(table_name="my_table", tags={"TABLE_OWNER": "o'brien"}) + """SQL targets the table schema for the object, upper-cases the table, escapes quotes.""" + sql = build_set_tag_sql(table_name="my_table", tags={"TABLE_OWNER": "o'brien"}, schema="DATA_SCIENCE") assert "ALTER TABLE PATTERN_DB.DATA_SCIENCE.MY_TABLE" in sql assert "PATTERN_DB.DATA_SCIENCE.TABLE_OWNER = 'o''brien'" in sql assert sql.strip().endswith(";") +def test_build_set_tag_sql_dev_table_uses_stage_schema(): + """A dev table is tagged with tag definitions co-located in DATA_SCIENCE_STAGE.""" + sql = build_set_tag_sql( + table_name="my_table", + tags={"TABLE_OWNER": "ds-advertising-team"}, + schema="DATA_SCIENCE_STAGE", + ) + + # Both the object and the tag definition are referenced from the stage schema. + assert "ALTER TABLE PATTERN_DB.DATA_SCIENCE_STAGE.MY_TABLE" in sql + assert "PATTERN_DB.DATA_SCIENCE_STAGE.TABLE_OWNER = 'ds-advertising-team'" in sql + + def test_build_set_tag_sql_empty_raises(): with pytest.raises(ValueError, match="No tags to apply"): - build_set_tag_sql(table_name="t", tags={}) + build_set_tag_sql(table_name="t", tags={}, schema="DATA_SCIENCE") def test_build_set_tag_sql_multiple_tags_joined(): @@ -80,6 +121,7 @@ def test_build_set_tag_sql_multiple_tags_joined(): sql = build_set_tag_sql( table_name="my_table", tags={"TABLE_OWNER": "john_doe", "TABLE_TEAM": "data-science", "TABLE_STATUS": "active"}, + schema="DATA_SCIENCE", ) assert sql.count("SET TAG") == 1 @@ -102,13 +144,13 @@ def test_build_table_tags_drops_empty_override_value(): def test_build_set_tag_sql_rejects_invalid_table_name(bad_table): """Non-identifier table names are rejected before reaching SQL.""" with pytest.raises(ValueError, match="Invalid table_name"): - build_set_tag_sql(table_name=bad_table, tags={"TABLE_OWNER": "john_doe"}) + build_set_tag_sql(table_name=bad_table, tags={"TABLE_OWNER": "john_doe"}, schema="DATA_SCIENCE") def test_build_set_tag_sql_rejects_invalid_tag_name(): """Non-identifier tag names are rejected.""" with pytest.raises(ValueError, match="Invalid tag name"): - build_set_tag_sql(table_name="my_table", tags={"TABLE_OWNER; DROP": "x"}) + build_set_tag_sql(table_name="my_table", tags={"TABLE_OWNER; DROP": "x"}, schema="DATA_SCIENCE") def test_build_set_tag_sql_rejects_invalid_schema(): @@ -135,7 +177,7 @@ def _boom(*_args, **_kwargs): monkeypatch.setattr(object_tags, "_execute_sql", _boom) conn = FakeConn() - apply_table_tags(conn=conn, table_name="my_table", tags={"TABLE_OWNER": "john_doe"}) + apply_table_tags(conn=conn, table_name="my_table", tags={"TABLE_OWNER": "john_doe"}, schema="DATA_SCIENCE") assert conn.committed is False assert "Warning: failed to apply ownership tags" in capsys.readouterr().out @@ -153,7 +195,9 @@ def _spy(*_args, **_kwargs): conn = FakeConn() # A malformed table name would otherwise build invalid/injectable SQL. - apply_table_tags(conn=conn, table_name="bad; DROP TABLE x", tags={"TABLE_OWNER": "john_doe"}) + apply_table_tags( + conn=conn, table_name="bad; DROP TABLE x", tags={"TABLE_OWNER": "john_doe"}, schema="DATA_SCIENCE" + ) assert executed is False # never reached execution assert conn.committed is False @@ -171,7 +215,7 @@ def _capture(conn, sql): monkeypatch.setattr(object_tags, "_execute_sql", _capture) conn = FakeConn() - apply_table_tags(conn=conn, table_name="my_table", tags={"TABLE_OWNER": "john_doe"}) + apply_table_tags(conn=conn, table_name="my_table", tags={"TABLE_OWNER": "john_doe"}, schema="DATA_SCIENCE") assert captured["conn"] is conn assert "ALTER TABLE PATTERN_DB.DATA_SCIENCE.MY_TABLE" in captured["sql"] @@ -191,7 +235,7 @@ def _spy(*_args, **_kwargs): monkeypatch.setattr(object_tags, "_execute_sql", _spy) conn = FakeConn() - apply_table_tags(conn=conn, table_name="my_table", tags={}) + apply_table_tags(conn=conn, table_name="my_table", tags={}, schema="DATA_SCIENCE") assert called is False assert conn.committed is False From b79f9f9408ac8eaeea6c36c2a0fcc6d30a2f1d1d Mon Sep 17 00:00:00 2001 From: vinay79n Date: Tue, 23 Jun 2026 13:04:44 +0530 Subject: [PATCH 6/7] revert: tag production tables only, drop dev tagging + functional test Team decided dev (DATA_SCIENCE_STAGE) tables shouldn't be tagged. Re-gate publish()/publish_pandas() on current.is_production; keep the owner-from- domain derivation, validation, and expanded allowed values. --- docs/metaflow/publish.md | 16 +- docs/metaflow/publish_pandas.md | 8 +- .../_snowflake/object_tags.py | 12 +- src/ds_platform_utils/metaflow/pandas.py | 21 +-- .../metaflow/write_audit_publish.py | 9 +- .../metaflow/test__publish_tagging.py | 169 ------------------ .../unit_tests/snowflake/test__object_tags.py | 15 +- 7 files changed, 34 insertions(+), 216 deletions(-) delete mode 100644 tests/functional_tests/metaflow/test__publish_tagging.py diff --git a/docs/metaflow/publish.md b/docs/metaflow/publish.md index 4ac2205..8716370 100644 --- a/docs/metaflow/publish.md +++ b/docs/metaflow/publish.md @@ -23,7 +23,7 @@ publish( - Reads SQL from a string or `.sql` path. - Runs write/audit/publish operations through Snowflake. - Adds operation details and table links to the Metaflow card when available. -- **Automatically applies ownership object tags to every published table** (see +- **Automatically applies ownership object tags to production tables** (see [Ownership tags](#ownership-tags) below). ## Parameters @@ -54,8 +54,8 @@ publish( ## Ownership tags -On **every** publish, `publish()` automatically applies the table-ownership object tags -from the table-ownership RFC. The seven tags are: +When publishing to **production**, `publish()` automatically applies the table-ownership +object tags from the table-ownership RFC. The seven tags are: | Tag | Source | Always set? | | --------------- | ------------------------------------------------------- | --------------- | @@ -96,13 +96,9 @@ publish( Notes: -- Tags are applied to **every** published table — prod tables in `DATA_SCIENCE` and - dev/stage tables in `DATA_SCIENCE_STAGE`. Tags are **co-located with the table's schema**: - a prod table references the tag definitions in `DATA_SCIENCE`, a dev table references - those in `DATA_SCIENCE_STAGE`. So the definitions must exist in **each** schema, and the - publishing role needs `APPLY` on the tags **in that schema** (e.g. the dev role needs - `APPLY` on the `DATA_SCIENCE_STAGE` tags). -- The tag *definitions* must first be created once by a Snowflake admin in each schema +- Tags are applied **only to production tables** (`DATA_SCIENCE`). Non-prod + (`DATA_SCIENCE_STAGE`) runs apply no tags. The publishing role needs `APPLY` on the tags. +- The tag *definitions* must first be created once by a Snowflake admin in `DATA_SCIENCE` (the RFC `CREATE TAG` setup). Until then, tagging is **skipped with a warning** — the publish still succeeds. - Invalid `status`/`sla` values raise `ValueError` before any data is written. diff --git a/docs/metaflow/publish_pandas.md b/docs/metaflow/publish_pandas.md index 0892978..b9d5f51 100644 --- a/docs/metaflow/publish_pandas.md +++ b/docs/metaflow/publish_pandas.md @@ -31,7 +31,7 @@ publish_pandas( - Validates DataFrame input. - Writes directly via `write_pandas` or via S3 stage flow for large data. - Adds a Snowflake table URL to Metaflow card output. -- **Automatically applies ownership object tags to every published table** (see +- **Automatically applies ownership object tags to production tables** (see [Ownership tags](#ownership-tags) below). ## Parameters @@ -58,7 +58,7 @@ publish_pandas( ## Ownership tags -On **every** publish, `publish_pandas()` automatically applies the same +When publishing to **production**, `publish_pandas()` automatically applies the same seven table-ownership object tags as [`publish`](publish.md#ownership-tags): `TABLE_OWNER`, `TABLE_TEAM`, `TABLE_DOMAIN`, `TABLE_PROJECT`, `TABLE_STATUS` and (when provided via `tags=`) `TABLE_SLA` / `TABLE_CONTACT`. @@ -71,9 +71,7 @@ publish_pandas( ) ``` -- Tags are applied to **every** published table — prod in `DATA_SCIENCE`, dev/stage in - `DATA_SCIENCE_STAGE`. Tags are co-located with the table's schema, so the definitions - must exist in **each** schema and the publishing role needs `APPLY` on the tags there. +- Tags are applied **only to production tables** (`DATA_SCIENCE`); non-prod runs apply none. - `TABLE_DOMAIN` / `TABLE_PROJECT` come from the `ds.domain` / `ds.project` Metaflow tags; if a flow runs without them they fall back to the literal `unknown` and a warning is printed. Ensure the flow carries those tags (automatic in CI / standard `poe` commands) diff --git a/src/ds_platform_utils/_snowflake/object_tags.py b/src/ds_platform_utils/_snowflake/object_tags.py index 14186ed..cada3a9 100644 --- a/src/ds_platform_utils/_snowflake/object_tags.py +++ b/src/ds_platform_utils/_snowflake/object_tags.py @@ -13,6 +13,7 @@ from typing import TYPE_CHECKING, Dict, Optional from ds_platform_utils._snowflake.run_query import _execute_sql +from ds_platform_utils.metaflow._consts import PROD_SCHEMA from ds_platform_utils.sql_utils import get_select_dev_query_tags if TYPE_CHECKING: @@ -165,12 +166,11 @@ def _validate_identifier(value: str, kind: str) -> None: raise ValueError(f"Invalid {kind} {value!r}; expected an unquoted identifier (letters/numbers/underscore).") -def build_set_tag_sql(table_name: str, tags: Dict[str, str], schema: str) -> str: +def build_set_tag_sql(table_name: str, tags: Dict[str, str], schema: str = PROD_SCHEMA) -> str: """Build a single ``ALTER TABLE ... SET TAG`` statement. - Tags are co-located with the table they describe: the table and its tag *definitions* - both live in ``schema`` -- ``DATA_SCIENCE`` for prod tables, ``DATA_SCIENCE_STAGE`` for - dev/stage tables (the definitions must exist in each schema). + Only production tables are tagged, so the table and its tag *definitions* both live in + ``schema`` (``DATA_SCIENCE``). :param table_name: Table to tag (upper-cased to match Snowflake's stored identifier). :param tags: Mapping of tag name to value (e.g. from :func:`build_table_tags`). @@ -193,7 +193,7 @@ def apply_table_tags( conn: "SnowflakeConnection", table_name: str, tags: Dict[str, str], - schema: str, + schema: str = PROD_SCHEMA, ) -> None: """Apply object tags to a published table, warning (never raising) on failure. @@ -205,7 +205,7 @@ def apply_table_tags( :param conn: Open Snowflake connection. :param table_name: Table to tag. :param tags: Mapping of tag name to value. - :param schema: Schema holding both the table and its tag definitions (prod or dev/stage). + :param schema: Schema holding both the table and its tag definitions (``DATA_SCIENCE``). """ if not tags: return diff --git a/src/ds_platform_utils/metaflow/pandas.py b/src/ds_platform_utils/metaflow/pandas.py index 512466d..1e09a54 100644 --- a/src/ds_platform_utils/metaflow/pandas.py +++ b/src/ds_platform_utils/metaflow/pandas.py @@ -151,10 +151,11 @@ def publish_pandas( # noqa: PLR0913 (too many arguments) use_logical_type=use_logical_type, ) - # Tag the published table. The S3 path has no open connection, so open one. - _tag_table_with_new_connection( - table_name=table_name, tags=table_tags, schema=schema, warehouse=warehouse, use_utc=use_utc - ) + # Tag the published table (prod only). The S3 path has no open connection, so open one. + if current.is_production: + _tag_table_with_new_connection( + table_name=table_name, tags=table_tags, warehouse=warehouse, use_utc=use_utc + ) else: conn: SnowflakeConnection = get_snowflake_connection(warehouse=warehouse, use_utc=use_utc) @@ -175,8 +176,9 @@ def publish_pandas( # noqa: PLR0913 (too many arguments) use_logical_type=use_logical_type, ) - # Tag the published table, reusing the open connection before closing it. - apply_table_tags(conn=conn, table_name=table_name, tags=table_tags, schema=schema) + # Tag the published table (prod only), reusing the open connection before closing it. + if current.is_production: + apply_table_tags(conn=conn, table_name=table_name, tags=table_tags) conn.close() # Add a link to the table in Snowflake to the card @@ -191,11 +193,10 @@ def publish_pandas( # noqa: PLR0913 (too many arguments) def _tag_table_with_new_connection( table_name: str, tags: Dict[str, str], - schema: str, warehouse: Optional[Union[Literal["XS", "MED", "XL"], str]], use_utc: bool, ) -> None: - """Open a short-lived connection and tag an already-published table. + """Open a short-lived connection and tag an already-published (production) table. Used by the S3-stage publish path, which has no open connection. Opening the connection happens outside ``apply_table_tags``' own error handling, so we guard it @@ -206,10 +207,10 @@ def _tag_table_with_new_connection( tag_conn = None try: tag_conn = get_snowflake_connection(warehouse=warehouse, use_utc=use_utc) - apply_table_tags(conn=tag_conn, table_name=table_name, tags=tags, schema=schema) + apply_table_tags(conn=tag_conn, table_name=table_name, tags=tags) except Exception as exc: # noqa: BLE001 -- tagging must never break a successful publish print( - f"Warning: failed to open a Snowflake connection to tag PATTERN_DB.{schema}.{table_name} " + f"Warning: failed to open a Snowflake connection to tag {table_name} " f"({exc}). The table was published successfully; tags were skipped." ) finally: diff --git a/src/ds_platform_utils/metaflow/write_audit_publish.py b/src/ds_platform_utils/metaflow/write_audit_publish.py index 7b3d119..f0ae71a 100644 --- a/src/ds_platform_utils/metaflow/write_audit_publish.py +++ b/src/ds_platform_utils/metaflow/write_audit_publish.py @@ -7,7 +7,6 @@ from snowflake.connector.cursor import SnowflakeCursor from ds_platform_utils._snowflake.run_query import _execute_sql -from ds_platform_utils.metaflow._consts import DEV_SCHEMA, PROD_SCHEMA from ds_platform_utils.metaflow.snowflake_connection import get_snowflake_connection from ds_platform_utils.sql_utils import get_query_from_string_or_fpath @@ -101,11 +100,9 @@ def publish( # noqa: PLR0913, D417 ) last_op_was_write = operation.operation_type == "write" - # Tag the final table. Done after the SWAP so tags land on the live table. Applied to every - # table; the table lives in DATA_SCIENCE (prod) or DATA_SCIENCE_STAGE (dev), while the tag - # definitions always live in DATA_SCIENCE (the apply_table_tags default tag_schema). - schema = PROD_SCHEMA if current.is_production else DEV_SCHEMA - apply_table_tags(conn=cur.connection, table_name=table_name, tags=table_tags, schema=schema) + # Tag the final table (prod only). Done after the SWAP so tags land on the live table. + if current.is_production: + apply_table_tags(conn=cur.connection, table_name=table_name, tags=table_tags) def update_card_with_operation_info( diff --git a/tests/functional_tests/metaflow/test__publish_tagging.py b/tests/functional_tests/metaflow/test__publish_tagging.py deleted file mode 100644 index dce713f..0000000 --- a/tests/functional_tests/metaflow/test__publish_tagging.py +++ /dev/null @@ -1,169 +0,0 @@ -"""Functional test: ownership object-tagging applied by publish() / publish_pandas(). - -Publishes a pandas DataFrame and a WAP table, each with a ``tags=`` override, then verifies -the ownership tags landed using the real-time ``INFORMATION_SCHEMA.TAG_REFERENCES`` function. - -This runs as a normal (non-production) flow, so the tables land in ``DATA_SCIENCE_STAGE`` and -tagging happens there. The tag *definitions* live in ``DATA_SCIENCE`` -- Snowflake applies a -``DATA_SCIENCE``-defined tag to a ``DATA_SCIENCE_STAGE`` object fine. - -Preconditions (slow test against a live Snowflake account): - * A role that can write ``DATA_SCIENCE_STAGE`` and has ``APPLY`` on the seven ``TABLE_*`` tags. - * The tag definitions already exist (RFC §3 admin setup). Without them (or without APPLY), - tagging is skipped-with-a-warning and the assertions below fail -- the intended signal that - setup is incomplete. -""" - -import subprocess -import sys - -import pytest -from metaflow import FlowSpec, project, step - -# These flows run non-prod (--environment=local run), so tables land in the stage schema. -TABLE_SCHEMA = "DATA_SCIENCE_STAGE" - -PANDAS_TABLE = "DS_PLATFORM_UTILS_TAGGING_TEST_PUBLISH_PANDAS" -WAP_TABLE = "DS_PLATFORM_UTILS_TAGGING_TEST_PUBLISH" - -# publish_pandas: no owner override -> owner is derived from the domain (ds--team). -PANDAS_TAGS = {"sla": "daily", "contact": "mlplatformteam@pattern.com"} -PANDAS_EXPECTED = { - "TABLE_OWNER": "ds-ml-platform-team", # derived from ds.domain:ml-platform - "TABLE_TEAM": "data-science", - "TABLE_DOMAIN": "ml-platform", - "TABLE_PROJECT": "ds-platform-utils-tests", - "TABLE_STATUS": "active", - "TABLE_SLA": "daily", - "TABLE_CONTACT": "mlplatformteam@pattern.com", -} - -# publish: explicit owner override -> the override wins over the domain derivation. -WAP_TAGS = {"owner": "mlplatform_team", "sla": "daily", "contact": "mlplatformteam@pattern.com"} -WAP_EXPECTED = {**PANDAS_EXPECTED, "TABLE_OWNER": "mlplatform_team"} - - -@project(name="ds_platform_utils_tests") -class TestPublishTaggingFlow(FlowSpec): - """Publish two tables with ownership tags, then verify the tags were applied.""" - - @step - def start(self): - """Publish a pandas DataFrame with a tags override (owner derived from domain).""" - import pandas as pd - - from ds_platform_utils.metaflow import publish_pandas - - df = pd.DataFrame( - { - "id": [1, 2, 3, 4, 5], - "name": ["Mario", "Luigi", "Peach", "Bowser", "Toad"], - "score": [90.5, 85.2, 88.7, 92.1, 78.9], - } - ) - - publish_pandas( - table_name=PANDAS_TABLE, - df=df, - auto_create_table=True, - overwrite=True, - tags=PANDAS_TAGS, - ) - - self.next(self.publish_step) - - @step - def publish_step(self): - """Publish a WAP table with an explicit owner override.""" - from ds_platform_utils.metaflow import publish - - query = """ - CREATE OR REPLACE TABLE PATTERN_DB.{{schema}}.{{table_name}} ( - id INT, - name STRING - ); - - INSERT INTO PATTERN_DB.{{schema}}.{{table_name}} (id, name) - VALUES (1, 'Mario'), (2, 'Luigi'), (3, 'Peach'); - """ - - publish( - table_name=WAP_TABLE, - query=query, - tags=WAP_TAGS, - ) - - self.next(self.verify_tags) - - @step - def verify_tags(self): - """Read tags back via the real-time INFORMATION_SCHEMA.TAG_REFERENCES function.""" - for table_name, expected in ((PANDAS_TABLE, PANDAS_EXPECTED), (WAP_TABLE, WAP_EXPECTED)): - actual = _fetch_tags(table_name) - for tag, expected_value in expected.items(): - assert actual.get(tag) == expected_value, ( - f"{table_name}: expected {tag}={expected_value!r}, got {actual.get(tag)!r}. All tags: {actual}" - ) - - self.next(self.end) - - @step - def end(self): - """End the flow.""" - pass - - -def _fetch_tags(table_name: str) -> dict: - """Return ``{TAG_NAME: TAG_VALUE}`` for a stage table using the real-time tag-references function.""" - from ds_platform_utils.metaflow import query_pandas_from_snowflake - - query = f""" - SELECT tag_name, tag_value - FROM TABLE( - INFORMATION_SCHEMA.TAG_REFERENCES('PATTERN_DB.{TABLE_SCHEMA}.{table_name}', 'TABLE') - ); - """ - df = query_pandas_from_snowflake(query) - # query_pandas_from_snowflake lower-cases column names. - return {row.tag_name: row.tag_value for row in df.itertuples()} - - -if __name__ == "__main__": - TestPublishTaggingFlow() - - -@pytest.mark.slow -def test_publish_tagging_flow(): - """Run the flow and assert the ownership tags are applied and readable.""" - cmd = [ - sys.executable, - __file__, - "--environment=local", - "--with=card", - "run", - "--tag=ds.domain:ml-platform", - "--tag=ds.project:ds-platform-utils-tests", - ] - - print("\n=== Metaflow Output ===") - for line in execute_with_output(cmd): - print(line, end="") - - -def execute_with_output(cmd): - """Execute a command and yield output lines as they are produced.""" - process = subprocess.Popen( - cmd, - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT, # Merge stderr into stdout - universal_newlines=True, - bufsize=1, - ) - - for line in iter(process.stdout.readline, ""): - yield line - - process.stdout.close() - return_code = process.wait() - if return_code: - raise subprocess.CalledProcessError(return_code, cmd) diff --git a/tests/unit_tests/snowflake/test__object_tags.py b/tests/unit_tests/snowflake/test__object_tags.py index dc8e63a..6d8ebdb 100644 --- a/tests/unit_tests/snowflake/test__object_tags.py +++ b/tests/unit_tests/snowflake/test__object_tags.py @@ -98,17 +98,12 @@ def test_build_set_tag_sql_format_and_escaping(): assert sql.strip().endswith(";") -def test_build_set_tag_sql_dev_table_uses_stage_schema(): - """A dev table is tagged with tag definitions co-located in DATA_SCIENCE_STAGE.""" - sql = build_set_tag_sql( - table_name="my_table", - tags={"TABLE_OWNER": "ds-advertising-team"}, - schema="DATA_SCIENCE_STAGE", - ) +def test_build_set_tag_sql_defaults_to_data_science(): + """Schema defaults to DATA_SCIENCE (only production tables are tagged).""" + sql = build_set_tag_sql(table_name="my_table", tags={"TABLE_OWNER": "ds-advertising-team"}) - # Both the object and the tag definition are referenced from the stage schema. - assert "ALTER TABLE PATTERN_DB.DATA_SCIENCE_STAGE.MY_TABLE" in sql - assert "PATTERN_DB.DATA_SCIENCE_STAGE.TABLE_OWNER = 'ds-advertising-team'" in sql + assert "ALTER TABLE PATTERN_DB.DATA_SCIENCE.MY_TABLE" in sql + assert "PATTERN_DB.DATA_SCIENCE.TABLE_OWNER = 'ds-advertising-team'" in sql def test_build_set_tag_sql_empty_raises(): From c77b85d4d9c8f0725dcebf0e370b0d9c00aa99e4 Mon Sep 17 00:00:00 2001 From: vinay79n Date: Wed, 24 Jun 2026 16:51:09 +0530 Subject: [PATCH 7/7] feat: update TABLE_OWNER resolution logic to prioritize ds.owner flow tag and enhance related documentation --- docs/metaflow/publish.md | 16 ++++++++------ .../_snowflake/object_tags.py | 19 +++++++++------- src/ds_platform_utils/metaflow/pandas.py | 6 ++--- .../metaflow/write_audit_publish.py | 5 +++-- src/ds_platform_utils/sql_utils.py | 1 + .../unit_tests/snowflake/test__object_tags.py | 22 +++++++++++++++++++ 6 files changed, 49 insertions(+), 20 deletions(-) diff --git a/docs/metaflow/publish.md b/docs/metaflow/publish.md index 8716370..31ca8ed 100644 --- a/docs/metaflow/publish.md +++ b/docs/metaflow/publish.md @@ -59,7 +59,7 @@ object tags from the table-ownership RFC. The seven tags are: | Tag | Source | Always set? | | --------------- | ------------------------------------------------------- | --------------- | -| `TABLE_OWNER` | owning-team alias derived from the domain (`ds--team`), else `unknown` | yes | +| `TABLE_OWNER` | `ds.owner` flow tag, else owning-team alias derived from the domain (`ds--team`), else `unknown` | yes | | `TABLE_TEAM` | `data-science` | yes | | `TABLE_DOMAIN` | `ds.domain` Metaflow tag, else `unknown` | yes | | `TABLE_PROJECT` | `ds.project` Metaflow tag, else `unknown` | yes | @@ -67,12 +67,14 @@ object tags from the table-ownership RFC. The seven tags are: | `TABLE_SLA` | override only (`streaming`/`realtime`/`hourly`/`daily`/`weekly`/`monthly`/`quarterly`/`ad_hoc`/`on_demand`) | only if given | | `TABLE_CONTACT` | override only (Slack channel or email) | only if given | -> **`TABLE_OWNER` is derived from the domain, not the run user.** Owner is resolved by -> priority: (1) an explicit `tags={"owner": ...}` override, else (2) the owning-team alias -> `ds--team` when the domain is known (e.g. domain `advertising` → -> `ds-advertising-team`), else (3) `unknown`. We don't use `current.username`, because on -> deployed/argo runs it resolves to a service identity (`argo-workflows`) rather than a -> person. Pass `tags={"owner": ...}` to set a specific individual or alias. +> **`TABLE_OWNER` is not the run user.** Owner is resolved by priority: +> (1) an explicit `tags={"owner": ...}` override, else +> (2) the **`ds.owner`** Metaflow flow tag (set in CI alongside `ds.domain`/`ds.project`), else +> (3) the owning-team alias `ds--team` when the domain is known (e.g. domain +> `advertising` → `ds-advertising-team`), else (4) `unknown`. We don't use `current.username`, +> because on deployed/argo runs it resolves to a service identity (`argo-workflows`) rather +> than a person. Set `ds.owner` on the flow for a per-flow owner, or pass `tags={"owner": ...}` +> per call. > **`TABLE_DOMAIN` / `TABLE_PROJECT` depend on flow tags.** These are read from the > `ds.domain` / `ds.project` Metaflow tags. If a flow runs without them, the value falls diff --git a/src/ds_platform_utils/_snowflake/object_tags.py b/src/ds_platform_utils/_snowflake/object_tags.py index cada3a9..2e8dfd1 100644 --- a/src/ds_platform_utils/_snowflake/object_tags.py +++ b/src/ds_platform_utils/_snowflake/object_tags.py @@ -102,11 +102,11 @@ def build_table_tags( TEAM / DOMAIN / PROJECT are derived from the Metaflow run context (reusing :func:`get_select_dev_query_tags`); STATUS defaults to ``active``. OWNER is resolved - by priority: (1) an explicit ``owner`` override, else (2) the owning-team alias derived - from the (possibly overridden) domain -- ``ds--team`` -- when the domain is - known, else (3) ``unknown``. (We deliberately don't use ``current.username`` for OWNER: - on deployed/argo runs it resolves to a service identity, not a person.) SLA and CONTACT - are only included when supplied via ``tags_override``. + by priority: (1) an explicit ``owner`` override, else (2) the ``ds.owner`` flow tag, + else (3) the owning-team alias derived from the (possibly overridden) domain -- + ``ds--team`` -- when the domain is known, else (4) ``unknown``. (We deliberately + don't use ``current.username`` for OWNER: on deployed/argo runs it resolves to a service + identity, not a person.) SLA and CONTACT are only included when supplied via ``tags_override``. :param tags_override: Optional overrides, keyed by ``owner``/``TABLE_OWNER``/etc. :param current_obj: Optional Metaflow ``current`` stand-in (for testing). @@ -126,11 +126,14 @@ def build_table_tags( # SLA / CONTACT are only set when explicitly provided. tags.update(overrides) - # Resolve OWNER: explicit override wins; else derive a team alias from the (final) domain - # when it's known; else fall back to "unknown". + # Resolve OWNER by priority: (1) explicit override, (2) the `ds.owner` flow tag, (3) the + # team alias derived from the (final) domain, (4) "unknown". if TAG_OWNER not in overrides: + ds_owner = derived.get("owner") domain = tags.get(TAG_DOMAIN) - if domain and domain != UNKNOWN_VALUE: + if ds_owner and ds_owner != UNKNOWN_VALUE: + tags[TAG_OWNER] = ds_owner + elif domain and domain != UNKNOWN_VALUE: tags[TAG_OWNER] = _owner_from_domain(domain) else: tags[TAG_OWNER] = UNKNOWN_VALUE diff --git a/src/ds_platform_utils/metaflow/pandas.py b/src/ds_platform_utils/metaflow/pandas.py index 1e09a54..5266e0a 100644 --- a/src/ds_platform_utils/metaflow/pandas.py +++ b/src/ds_platform_utils/metaflow/pandas.py @@ -94,9 +94,9 @@ def publish_pandas( # noqa: PLR0913 (too many arguments) :param tags: Optional overrides for the ownership/governance object tags applied to the published table (see the table-ownership RFC). Keys may be `owner`/`team`/`domain`/`project`/`status`/`sla`/ `contact` (optionally `TABLE_`-prefixed). TEAM/DOMAIN/PROJECT are derived from the Metaflow - run context when not overridden; OWNER defaults to the owning-team alias `ds--team` - (or `unknown` if the domain is unknown); STATUS defaults to `active`; SLA/CONTACT are only applied - when provided here. Tags are only applied to **production** tables; in non-prod runs no tags are applied. + run context when not overridden; OWNER is resolved by priority -- explicit `owner` override → + `ds.owner` flow tag → owning-team alias `ds--team` → `unknown`; STATUS defaults to + `active`; SLA/CONTACT are only applied when provided here. Tags are only applied to **production** tables; in non-prod runs no tags are applied. If the tag definitions have not yet been created by a Snowflake admin, tagging is skipped with a warning (the publish still succeeds). """ diff --git a/src/ds_platform_utils/metaflow/write_audit_publish.py b/src/ds_platform_utils/metaflow/write_audit_publish.py index f0ae71a..9ee1356 100644 --- a/src/ds_platform_utils/metaflow/write_audit_publish.py +++ b/src/ds_platform_utils/metaflow/write_audit_publish.py @@ -47,8 +47,9 @@ def publish( # noqa: PLR0913, D417 :param tags: Optional overrides for the ownership/governance object tags applied to the published table (see the table-ownership RFC). Keys may be ``owner``/``team``/``domain``/``project``/ ``status``/``sla``/``contact`` (optionally ``TABLE_``-prefixed). TEAM/DOMAIN/PROJECT are - derived from the Metaflow run context when not overridden; OWNER defaults to the owning-team - alias ``ds--team`` (or ``unknown`` if the domain is unknown); STATUS defaults to + derived from the Metaflow run context when not overridden; OWNER is resolved by priority -- + explicit ``owner`` override → ``ds.owner`` flow tag → owning-team alias ``ds--team`` → + ``unknown``; STATUS defaults to ``active``; SLA/CONTACT are only applied when provided here. Tags are only applied to **production** tables; in non-prod runs no tags are applied. If the tag definitions have not yet been created by a Snowflake admin, tagging is skipped with a warning (the publish still succeeds). diff --git a/src/ds_platform_utils/sql_utils.py b/src/ds_platform_utils/sql_utils.py index bc224fe..755c511 100644 --- a/src/ds_platform_utils/sql_utils.py +++ b/src/ds_platform_utils/sql_utils.py @@ -80,6 +80,7 @@ def _attr(name: str, default: str = "unknown") -> str: "step_name": _attr("step_name"), "run_id": _attr("run_id"), "user": _attr("username"), + "owner": _extract("ds.owner"), "domain": _extract("ds.domain"), "namespace": _attr("namespace"), "perimeter": str(os.environ.get("OB_CURRENT_PERIMETER") or os.environ.get("OBP_PERIMETER")), diff --git a/tests/unit_tests/snowflake/test__object_tags.py b/tests/unit_tests/snowflake/test__object_tags.py index 6d8ebdb..259623b 100644 --- a/tests/unit_tests/snowflake/test__object_tags.py +++ b/tests/unit_tests/snowflake/test__object_tags.py @@ -62,6 +62,28 @@ def test_build_table_tags_explicit_owner_beats_domain_derivation(): assert tags["TABLE_OWNER"] == "jane" +def test_build_table_tags_ds_owner_flow_tag_beats_domain_derivation(): + """A `ds.owner` flow tag is used (priority 2) over the domain-derived team alias.""" + + class DsOwnerCurrent(FakeCurrent): + tags = ["ds.domain:recommendations", "ds.project:two_tower_v2", "ds.owner:john_doe"] + + tags = build_table_tags(current_obj=DsOwnerCurrent()) + + assert tags["TABLE_OWNER"] == "john_doe" + + +def test_build_table_tags_explicit_owner_beats_ds_owner_flow_tag(): + """An explicit override (priority 1) wins over the `ds.owner` flow tag (priority 2).""" + + class DsOwnerCurrent(FakeCurrent): + tags = ["ds.domain:recommendations", "ds.project:two_tower_v2", "ds.owner:john_doe"] + + tags = build_table_tags(tags_override={"owner": "jane"}, current_obj=DsOwnerCurrent()) + + assert tags["TABLE_OWNER"] == "jane" + + def test_build_table_tags_overrides_win(): """Overrides (incl. alias + cased keys) replace derived values and add SLA/CONTACT.""" tags = build_table_tags(