diff --git a/ingestion/src/metadata/ingestion/lineage/parser.py b/ingestion/src/metadata/ingestion/lineage/parser.py index 2154c2d2b104..e8056128c70e 100644 --- a/ingestion/src/metadata/ingestion/lineage/parser.py +++ b/ingestion/src/metadata/ingestion/lineage/parser.py @@ -11,7 +11,9 @@ """ Lineage Parser configuration """ + import hashlib +import re import time import traceback from collections import defaultdict @@ -64,6 +66,31 @@ # max memory in MB that lineage parsing can consume LINEAGE_PARSING_MEMORY_LIMIT_MB = 100 +# Pre-compiled regex to rewrite ClickHouse MATERIALIZED VIEW ... TO queries +# into CREATE TABLE AS SELECT ... so that sqllineage can correctly identify +# the downstream target table instead of the view name itself. +# +# Handles all documented ClickHouse CREATE MATERIALIZED VIEW forms: +# 1. CREATE MATERIALIZED VIEW [IF NOT EXISTS] mv_name [ON CLUSTER c] TO target AS SELECT ... +# 2. CREATE MATERIALIZED VIEW mv_name REFRESH EVERY n HOUR [OFFSET m MINUTE] TO target (col1, col2) +# [DEFINER = user] [SQL SECURITY ...] AS SELECT ... +# Also: REFRESH AFTER n SECOND form (alternative ClickHouse refresh syntax) +# 3. ENGINE = ... clauses between TO target and AS SELECT are skipped. +# + # The character class for handles backtick-quoted segments with spaces + # and stops at the first whitespace / opening paren NOT inside backticks. + _CLICKHOUSE_MV_TO_RE = re.compile( + r"^\s*CREATE\s+MATERIALIZED\s+VIEW\s+" + r"(?:IF\s+NOT\s+EXISTS\s+)?" # optional IF NOT EXISTS + r"(?:`[^`]+`|\S+)\s+" # skip MV name (handles quoted names with spaces) + r"(?:ON\s+CLUSTER\s+\S+\s+)?" # optional ON CLUSTER + r"(?:REFRESH\s+(?:EVERY|AFTER)\s+(?:(?!\bTO\b)[\s\S])*?)?" # optional REFRESH + r"TO\s+((?:`[^`]+`|[\w`\.\[\]\"])+)" # capture target table + r"(?:\s*\([^)]*\))?" # optional column list (col1, col2, ...) + r".*?AS\s+(SELECT.*)", # skip ENGINE/DEFINER/SETTINGS, capture SELECT body + re.IGNORECASE | re.DOTALL, + ) + class LineageParser: """ @@ -234,7 +261,7 @@ def table_aliases(self) -> Dict[str, str]: # Check if involved_tables is present if not self.involved_tables: logger.debug( - f"[{self.query_hash}] [UsageSink] No involved tables found — alias map will be empty." + f"[{self.query_hash}] [UsageSink] No involved tables found -- alias map will be empty." ) return {} @@ -512,6 +539,22 @@ def clean_raw_query(cls, raw_query: str) -> Optional[str]: clean_query = clean_query.replace("\\n", "\n") + # Rewrite ClickHouse MATERIALIZED VIEW ... TO AS SELECT ... + # into CREATE TABLE AS SELECT ... so that sqllineage correctly + # identifies the downstream target table instead of the view name. + # + # Without this rewrite, sqllineage treats the MV name as the CREATE target + # and never registers the table named after TO as a downstream node. + # We handle it at this layer (query normalisation) so all three parsers + # (SqlGlot, SqlFluff, SqlParse) benefit automatically and no synthetic + # queries are written to query history. + if insensitive_match(clean_query, r"^\s*CREATE\s+MATERIALIZED\s+VIEW\s+"): + mv_to_match = _CLICKHOUSE_MV_TO_RE.search(clean_query) + if mv_to_match: + target_table = mv_to_match.group(1).strip() + select_body = mv_to_match.group(2) + clean_query = f"CREATE TABLE {target_table} AS {select_body}" + if insensitive_match( clean_query, r"\s*/\*.*?\*/\s*merge.*into.*?when matched.*?" ): diff --git a/ingestion/tests/unit/lineage/test_sql_lineage.py b/ingestion/tests/unit/lineage/test_sql_lineage.py index 6c2320635a0e..2ecc8067c56c 100644 --- a/ingestion/tests/unit/lineage/test_sql_lineage.py +++ b/ingestion/tests/unit/lineage/test_sql_lineage.py @@ -12,6 +12,7 @@ """ sql lineage utils tests """ + import uuid from unittest import TestCase @@ -459,3 +460,217 @@ def test_stage_lineage_with_file_format_options(self): len(parser.target_tables) > 0, f"Expected target tables for query: {query}", ) + + +class ClickhouseMaterializedViewLineageTest(TestCase): + """ + Tests for ClickHouse MATERIALIZED VIEW TO lineage rewriting + in LineageParser.clean_raw_query. + """ + + def test_simple_materialized_view_to(self): + """ + Basic MATERIALIZED VIEW ... TO target AS SELECT is rewritten so + the target table -- not the MV -- appears as the write table. + """ + query = ( + "CREATE MATERIALIZED VIEW default.my_mv " + "TO default.my_target " + "AS SELECT * FROM default.my_source" + ) + cleaned = LineageParser.clean_raw_query(query) + self.assertIsNotNone(cleaned) + self.assertIn("CREATE TABLE default.my_target", cleaned) + self.assertNotIn("my_mv", cleaned) + + def test_materialized_view_with_engine_clause(self): + """ + ENGINE clause between TO and AS must not be captured as the target name. + """ + query = ( + "CREATE MATERIALIZED VIEW db.mv_name " + "TO db.target_table " + "ENGINE = MergeTree() " + "AS SELECT id, name FROM db.source_table" + ) + cleaned = LineageParser.clean_raw_query(query) + self.assertIsNotNone(cleaned) + self.assertIn("CREATE TABLE db.target_table", cleaned) + self.assertNotIn("ENGINE", cleaned.split("AS SELECT")[0]) + + def test_materialized_view_if_not_exists(self): + """ + IF NOT EXISTS variant should still resolve the correct target table. + """ + query = ( + "CREATE MATERIALIZED VIEW IF NOT EXISTS analytics.mv_clicks " + "TO analytics.clicks_agg " + "AS SELECT page, count(*) as cnt FROM analytics.raw_clicks GROUP BY page" + ) + cleaned = LineageParser.clean_raw_query(query) + self.assertIsNotNone(cleaned) + self.assertIn("CREATE TABLE analytics.clicks_agg", cleaned) + + def test_materialized_view_on_cluster(self): + """ + ON CLUSTER clause should be skipped and not interfere with TO parsing. + """ + query = ( + "CREATE MATERIALIZED VIEW cluster_db.mv_events " + "ON CLUSTER my_cluster " + "TO cluster_db.events_summary " + "AS SELECT event_type, count() FROM cluster_db.events GROUP BY event_type" + ) + cleaned = LineageParser.clean_raw_query(query) + self.assertIsNotNone(cleaned) + self.assertIn("CREATE TABLE cluster_db.events_summary", cleaned) + + def test_materialized_view_without_to_clause_unchanged(self): + """ + A MATERIALIZED VIEW without a TO clause should not be transformed -- + the query is returned as-is (or filtered by the caller). + """ + query = "CREATE MATERIALIZED VIEW my_mv AS SELECT * FROM source_table" + cleaned = LineageParser.clean_raw_query(query) + # Without TO, no transformation; the query should pass through unchanged + self.assertIsNotNone(cleaned) + self.assertNotIn("CREATE TABLE", cleaned) + + def test_clean_query_produces_correct_lineage(self): + """ + End-to-end: after clean_raw_query the LineageParser should identify + the TO target as the write table, not the MV name. + """ + query = ( + "CREATE MATERIALIZED VIEW default.mv_orders " + "TO default.orders_agg " + "AS SELECT customer_id, sum(total) FROM default.orders GROUP BY customer_id" + ) + cleaned = LineageParser.clean_raw_query(query) + self.assertIsNotNone(cleaned) + parser = LineageParser(cleaned) + target_names = [str(t) for t in parser.target_tables] + source_names = [str(t) for t in parser.source_tables] + self.assertTrue( + any("orders_agg" in t for t in target_names), + f"Expected orders_agg in targets, got: {target_names}", + ) + self.assertTrue( + any("orders" in s for s in source_names), + f"Expected orders in sources, got: {source_names}", + ) + + def test_materialized_view_refresh_every_with_definer(self): + """ + REFRESH EVERY ... TO (col_list) DEFINER = SQL SECURITY DEFINER + AS SELECT ... -- the second exact variant from issue #26265. + + The REFRESH EVERY clause, the column list between TO and AS, and the + DEFINER / SQL SECURITY tokens must all be swallowed without polluting + the captured target table name or SELECT body. + """ + query = ( + "CREATE MATERIALIZED VIEW schema01.samples_mv " + "REFRESH EVERY 3 HOUR " + "TO schema02.samples_e " + "(column_01, column_02) " + "DEFINER = myuser SQL SECURITY DEFINER " + "AS SELECT * FROM schema01.samples" + ) + cleaned = LineageParser.clean_raw_query(query) + self.assertIsNotNone(cleaned) + # Target must be the TO table, not the MV name + self.assertIn("CREATE TABLE schema02.samples_e", cleaned) + self.assertNotIn("samples_mv", cleaned) + # The SELECT body must be intact + self.assertIn("SELECT * FROM schema01.samples", cleaned) + # DEFINER / REFRESH tokens must not appear before SELECT + prefix = cleaned.split("SELECT")[0].upper() + self.assertNotIn("DEFINER", prefix) + self.assertNotIn("REFRESH", prefix) + + def test_materialized_view_with_column_list(self): + """ + Column list (col1, col2) between the TO target and AS SELECT must be + discarded -- it must not be captured as part of the target table name. + Covers the first exact query from issue #26265: + + CREATE MATERIALIZED VIEW .samples_mv + TO .samples_e (column_01, column_02) + FROM .samples + """ + query = ( + "CREATE MATERIALIZED VIEW schema01.samples_mv " + "TO schema02.samples_e " + "(column_01, column_02) " + "AS SELECT column_01, column_02 FROM schema01.samples" + ) + cleaned = LineageParser.clean_raw_query(query) + self.assertIsNotNone(cleaned) + self.assertIn("CREATE TABLE schema02.samples_e", cleaned) + # Column list must not bleed into the table name + self.assertNotIn("(column_01", cleaned.split("AS SELECT")[0]) + + def test_materialized_view_refresh_every(self): + """ + REFRESH EVERY clause should be handled and the target table correctly identified. + """ + query = ( + "CREATE MATERIALIZED VIEW default.mv_refresh " + "REFRESH EVERY INTERVAL 1 MINUTE " + "TO default.target_refresh " + "AS SELECT * FROM default.source_refresh" + ) + cleaned = LineageParser.clean_raw_query(query) + self.assertIsNotNone(cleaned) + self.assertIn("CREATE TABLE default.target_refresh", cleaned) + self.assertIn("AS SELECT * FROM default.source_refresh", cleaned) + + def test_materialized_view_refresh_after(self): + """ + REFRESH AFTER form (alternative ClickHouse refresh syntax) should be handled. + This addresses the gitar-bot edge-case comment: REFRESH AFTER was not matched + by the previous REFRESH EVERY-only pattern. + """ + query = ( + "CREATE MATERIALIZED VIEW default.mv_after " + "REFRESH AFTER 30 SECOND " + "TO default.target_after " + "AS SELECT id, ts FROM default.events" + ) + cleaned = LineageParser.clean_raw_query(query) + self.assertIsNotNone(cleaned) + self.assertIn("CREATE TABLE default.target_after", cleaned) + self.assertNotIn("mv_after", cleaned) + self.assertIn("AS SELECT id, ts FROM default.events", cleaned) + + def test_materialized_view_refresh_every_with_offset(self): + """ + REFRESH EVERY n HOUR OFFSET m MINUTE - complex multi-token interval form. + """ + query = ( + "CREATE MATERIALIZED VIEW default.mv_offset " + "REFRESH EVERY 1 DAY OFFSET 30 MINUTE " + "TO default.target_offset " + "AS SELECT * FROM default.source_offset" + ) + cleaned = LineageParser.clean_raw_query(query) + self.assertIsNotNone(cleaned) + self.assertIn("CREATE TABLE default.target_offset", cleaned) + self.assertNotIn("OFFSET", cleaned.split("AS SELECT")[0].upper()) + + def test_materialized_view_quoted_with_spaces(self): + """ + Verify that quoted identifiers containing spaces (e.g. `db name`.`table name`) + are correctly captured by the regex and not truncated at the space. + """ + query = ( + "CREATE MATERIALIZED VIEW `my mv` " + "TO `my schema`.`my table` " + "AS SELECT * FROM `source schema`.`source table`" + ) + cleaned = LineageParser.clean_raw_query(query) + self.assertIsNotNone(cleaned) + # Should NOT be truncated to "CREATE TABLE `my" + self.assertIn("CREATE TABLE `my schema`.`my table` AS SELECT", cleaned) + self.assertIn("FROM `source schema`.`source table`", cleaned)