Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 44 additions & 1 deletion ingestion/src/metadata/ingestion/lineage/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@
"""
Lineage Parser configuration
"""

import hashlib
import re
import time
import traceback
from collections import defaultdict
Expand Down Expand Up @@ -64,6 +66,31 @@
# max memory in MB that lineage parsing can consume
LINEAGE_PARSING_MEMORY_LIMIT_MB = 100

# Pre-compiled regex to rewrite ClickHouse MATERIALIZED VIEW ... TO <target> queries
# into CREATE TABLE <target> AS SELECT ... so that sqllineage can correctly identify
# the downstream target table instead of the view name itself.
#
# Handles all documented ClickHouse CREATE MATERIALIZED VIEW forms:
# 1. CREATE MATERIALIZED VIEW [IF NOT EXISTS] mv_name [ON CLUSTER c] TO target AS SELECT ...
# 2. CREATE MATERIALIZED VIEW mv_name REFRESH EVERY n HOUR [OFFSET m MINUTE] TO target (col1, col2)
# [DEFINER = user] [SQL SECURITY ...] AS SELECT ...
# Also: REFRESH AFTER n SECOND form (alternative ClickHouse refresh syntax)
# 3. ENGINE = ... clauses between TO target and AS SELECT are skipped.
#
# The character class for <target> handles backtick-quoted segments with spaces
# and stops at the first whitespace / opening paren NOT inside backticks.
Comment on lines +80 to +81
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Quality: Stray indentation on module-level comment

Line 80 has a 4-space indent on a module-level comment ( # The character class for <target>...). While Python ignores comment indentation, this visually suggests it belongs to a code block rather than continuing the module-level documentation block above it.

Was this helpful? React with 👍 / 👎 | Reply gitar fix to apply this suggestion

_CLICKHOUSE_MV_TO_RE = re.compile(
r"^\s*CREATE\s+MATERIALIZED\s+VIEW\s+"
r"(?:IF\s+NOT\s+EXISTS\s+)?" # optional IF NOT EXISTS
r"(?:`[^`]+`|\S+)\s+" # skip MV name (handles quoted names with spaces)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Edge Case: MV-name skip group fails for multi-part backtick-quoted names

The regex group that skips the MV name on line 85 ((?:\[^\`]+`|\S+)\s+) only matches a single backtick-quoted segment. For a multi-part backtick-quoted MV name like `` my schema.my mv``, it would match ``my schema`` and then expect whitespace, but instead encounter.`, causing the entire regex to fail and silently skip the rewrite.

This is an edge case (ClickHouse MV names with spaces in both schema and table parts are rare), but it would result in a silent false-negative where the lineage is not captured.

Suggested fix:

Change the MV-name skip group to repeat, matching the same pattern as the target capture:

  r"(?:(?:`[^`]+`|\S+)\.)*(?:`[^`]+`|\S+)\s+"  # skip MV name (multi-part, quoted)

Was this helpful? React with 👍 / 👎 | Reply gitar fix to apply this suggestion

r"(?:ON\s+CLUSTER\s+\S+\s+)?" # optional ON CLUSTER <cluster_name>
r"(?:REFRESH\s+(?:EVERY|AFTER)\s+(?:(?!\bTO\b)[\s\S])*?)?" # optional REFRESH
r"TO\s+((?:`[^`]+`|[\w`\.\[\]\"])+)" # capture target table
r"(?:\s*\([^)]*\))?" # optional column list (col1, col2, ...)
r".*?AS\s+(SELECT.*)", # skip ENGINE/DEFINER/SETTINGS, capture SELECT body
re.IGNORECASE | re.DOTALL,
)


class LineageParser:
"""
Expand Down Expand Up @@ -234,7 +261,7 @@ def table_aliases(self) -> Dict[str, str]:
# Check if involved_tables is present
if not self.involved_tables:
logger.debug(
f"[{self.query_hash}] [UsageSink] No involved tables found alias map will be empty."
f"[{self.query_hash}] [UsageSink] No involved tables found -- alias map will be empty."
)
return {}

Expand Down Expand Up @@ -512,6 +539,22 @@ def clean_raw_query(cls, raw_query: str) -> Optional[str]:

clean_query = clean_query.replace("\\n", "\n")

# Rewrite ClickHouse MATERIALIZED VIEW ... TO <target> AS SELECT ...
# into CREATE TABLE <target> AS SELECT ... so that sqllineage correctly
# identifies the downstream target table instead of the view name.
#
# Without this rewrite, sqllineage treats the MV name as the CREATE target
# and never registers the table named after TO as a downstream node.
# We handle it at this layer (query normalisation) so all three parsers
# (SqlGlot, SqlFluff, SqlParse) benefit automatically and no synthetic
# queries are written to query history.
if insensitive_match(clean_query, r"^\s*CREATE\s+MATERIALIZED\s+VIEW\s+"):
mv_to_match = _CLICKHOUSE_MV_TO_RE.search(clean_query)
if mv_to_match:
target_table = mv_to_match.group(1).strip()
select_body = mv_to_match.group(2)
clean_query = f"CREATE TABLE {target_table} AS {select_body}"

if insensitive_match(
clean_query, r"\s*/\*.*?\*/\s*merge.*into.*?when matched.*?"
):
Expand Down
215 changes: 215 additions & 0 deletions ingestion/tests/unit/lineage/test_sql_lineage.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
"""
sql lineage utils tests
"""

import uuid
from unittest import TestCase

Expand Down Expand Up @@ -459,3 +460,217 @@ def test_stage_lineage_with_file_format_options(self):
len(parser.target_tables) > 0,
f"Expected target tables for query: {query}",
)


class ClickhouseMaterializedViewLineageTest(TestCase):
"""
Tests for ClickHouse MATERIALIZED VIEW TO <target> lineage rewriting
in LineageParser.clean_raw_query.
"""

def test_simple_materialized_view_to(self):
"""
Basic MATERIALIZED VIEW ... TO target AS SELECT is rewritten so
the target table -- not the MV -- appears as the write table.
"""
query = (
"CREATE MATERIALIZED VIEW default.my_mv "
"TO default.my_target "
"AS SELECT * FROM default.my_source"
)
cleaned = LineageParser.clean_raw_query(query)
self.assertIsNotNone(cleaned)
self.assertIn("CREATE TABLE default.my_target", cleaned)
self.assertNotIn("my_mv", cleaned)

def test_materialized_view_with_engine_clause(self):
"""
ENGINE clause between TO and AS must not be captured as the target name.
"""
query = (
"CREATE MATERIALIZED VIEW db.mv_name "
"TO db.target_table "
"ENGINE = MergeTree() "
"AS SELECT id, name FROM db.source_table"
)
cleaned = LineageParser.clean_raw_query(query)
self.assertIsNotNone(cleaned)
self.assertIn("CREATE TABLE db.target_table", cleaned)
self.assertNotIn("ENGINE", cleaned.split("AS SELECT")[0])

def test_materialized_view_if_not_exists(self):
"""
IF NOT EXISTS variant should still resolve the correct target table.
"""
query = (
"CREATE MATERIALIZED VIEW IF NOT EXISTS analytics.mv_clicks "
"TO analytics.clicks_agg "
"AS SELECT page, count(*) as cnt FROM analytics.raw_clicks GROUP BY page"
)
cleaned = LineageParser.clean_raw_query(query)
self.assertIsNotNone(cleaned)
self.assertIn("CREATE TABLE analytics.clicks_agg", cleaned)

def test_materialized_view_on_cluster(self):
"""
ON CLUSTER clause should be skipped and not interfere with TO parsing.
"""
query = (
"CREATE MATERIALIZED VIEW cluster_db.mv_events "
"ON CLUSTER my_cluster "
"TO cluster_db.events_summary "
"AS SELECT event_type, count() FROM cluster_db.events GROUP BY event_type"
)
cleaned = LineageParser.clean_raw_query(query)
self.assertIsNotNone(cleaned)
self.assertIn("CREATE TABLE cluster_db.events_summary", cleaned)

def test_materialized_view_without_to_clause_unchanged(self):
"""
A MATERIALIZED VIEW without a TO clause should not be transformed --
the query is returned as-is (or filtered by the caller).
"""
query = "CREATE MATERIALIZED VIEW my_mv AS SELECT * FROM source_table"
cleaned = LineageParser.clean_raw_query(query)
# Without TO, no transformation; the query should pass through unchanged
self.assertIsNotNone(cleaned)
self.assertNotIn("CREATE TABLE", cleaned)

def test_clean_query_produces_correct_lineage(self):
"""
End-to-end: after clean_raw_query the LineageParser should identify
the TO target as the write table, not the MV name.
"""
query = (
"CREATE MATERIALIZED VIEW default.mv_orders "
"TO default.orders_agg "
"AS SELECT customer_id, sum(total) FROM default.orders GROUP BY customer_id"
)
cleaned = LineageParser.clean_raw_query(query)
self.assertIsNotNone(cleaned)
parser = LineageParser(cleaned)
target_names = [str(t) for t in parser.target_tables]
source_names = [str(t) for t in parser.source_tables]
self.assertTrue(
any("orders_agg" in t for t in target_names),
f"Expected orders_agg in targets, got: {target_names}",
)
self.assertTrue(
any("orders" in s for s in source_names),
f"Expected orders in sources, got: {source_names}",
)

def test_materialized_view_refresh_every_with_definer(self):
"""
REFRESH EVERY ... TO <target> (col_list) DEFINER = <user> SQL SECURITY DEFINER
AS SELECT ... -- the second exact variant from issue #26265.

The REFRESH EVERY clause, the column list between TO and AS, and the
DEFINER / SQL SECURITY tokens must all be swallowed without polluting
the captured target table name or SELECT body.
"""
query = (
"CREATE MATERIALIZED VIEW schema01.samples_mv "
"REFRESH EVERY 3 HOUR "
"TO schema02.samples_e "
"(column_01, column_02) "
"DEFINER = myuser SQL SECURITY DEFINER "
"AS SELECT * FROM schema01.samples"
)
cleaned = LineageParser.clean_raw_query(query)
self.assertIsNotNone(cleaned)
# Target must be the TO table, not the MV name
self.assertIn("CREATE TABLE schema02.samples_e", cleaned)
self.assertNotIn("samples_mv", cleaned)
# The SELECT body must be intact
self.assertIn("SELECT * FROM schema01.samples", cleaned)
# DEFINER / REFRESH tokens must not appear before SELECT
prefix = cleaned.split("SELECT")[0].upper()
self.assertNotIn("DEFINER", prefix)
self.assertNotIn("REFRESH", prefix)

def test_materialized_view_with_column_list(self):
"""
Column list (col1, col2) between the TO target and AS SELECT must be
discarded -- it must not be captured as part of the target table name.
Covers the first exact query from issue #26265:

CREATE MATERIALIZED VIEW <schema-01>.samples_mv
TO <schema-02>.samples_e (column_01, column_02)
FROM <schema-01>.samples
"""
query = (
"CREATE MATERIALIZED VIEW schema01.samples_mv "
"TO schema02.samples_e "
"(column_01, column_02) "
"AS SELECT column_01, column_02 FROM schema01.samples"
)
cleaned = LineageParser.clean_raw_query(query)
self.assertIsNotNone(cleaned)
self.assertIn("CREATE TABLE schema02.samples_e", cleaned)
# Column list must not bleed into the table name
self.assertNotIn("(column_01", cleaned.split("AS SELECT")[0])

def test_materialized_view_refresh_every(self):
"""
REFRESH EVERY clause should be handled and the target table correctly identified.
"""
query = (
"CREATE MATERIALIZED VIEW default.mv_refresh "
"REFRESH EVERY INTERVAL 1 MINUTE "
"TO default.target_refresh "
"AS SELECT * FROM default.source_refresh"
)
cleaned = LineageParser.clean_raw_query(query)
self.assertIsNotNone(cleaned)
self.assertIn("CREATE TABLE default.target_refresh", cleaned)
self.assertIn("AS SELECT * FROM default.source_refresh", cleaned)

def test_materialized_view_refresh_after(self):
"""
REFRESH AFTER form (alternative ClickHouse refresh syntax) should be handled.
This addresses the gitar-bot edge-case comment: REFRESH AFTER was not matched
by the previous REFRESH EVERY-only pattern.
"""
query = (
"CREATE MATERIALIZED VIEW default.mv_after "
"REFRESH AFTER 30 SECOND "
"TO default.target_after "
"AS SELECT id, ts FROM default.events"
)
cleaned = LineageParser.clean_raw_query(query)
self.assertIsNotNone(cleaned)
self.assertIn("CREATE TABLE default.target_after", cleaned)
self.assertNotIn("mv_after", cleaned)
self.assertIn("AS SELECT id, ts FROM default.events", cleaned)

def test_materialized_view_refresh_every_with_offset(self):
"""
REFRESH EVERY n HOUR OFFSET m MINUTE - complex multi-token interval form.
"""
query = (
"CREATE MATERIALIZED VIEW default.mv_offset "
"REFRESH EVERY 1 DAY OFFSET 30 MINUTE "
"TO default.target_offset "
"AS SELECT * FROM default.source_offset"
)
cleaned = LineageParser.clean_raw_query(query)
self.assertIsNotNone(cleaned)
self.assertIn("CREATE TABLE default.target_offset", cleaned)
self.assertNotIn("OFFSET", cleaned.split("AS SELECT")[0].upper())

def test_materialized_view_quoted_with_spaces(self):
"""
Verify that quoted identifiers containing spaces (e.g. `db name`.`table name`)
are correctly captured by the regex and not truncated at the space.
"""
query = (
"CREATE MATERIALIZED VIEW `my mv` "
"TO `my schema`.`my table` "
"AS SELECT * FROM `source schema`.`source table`"
)
cleaned = LineageParser.clean_raw_query(query)
self.assertIsNotNone(cleaned)
# Should NOT be truncated to "CREATE TABLE `my"
self.assertIn("CREATE TABLE `my schema`.`my table` AS SELECT", cleaned)
self.assertIn("FROM `source schema`.`source table`", cleaned)
Loading