Skip to content

Commit 6e4e90b

Browse files
author
Joseph Thomas Stalin
committed
fix: tighten MV TO regex; add unit tests for ClickHouse lineage rewrite
1 parent 8236888 commit 6e4e90b

2 files changed

Lines changed: 114 additions & 6 deletions

File tree

ingestion/src/metadata/ingestion/lineage/parser.py

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -513,17 +513,23 @@ def clean_raw_query(cls, raw_query: str) -> Optional[str]:
513513

514514
clean_query = clean_query.replace("\\n", "\n")
515515

516-
# Convert Clickhouse MATERIALIZED VIEW ... TO ... AS SELECT ...
517-
# into CREATE TABLE ... AS SELECT ... so sqllineage parses the correct target
516+
# Convert ClickHouse MATERIALIZED VIEW ... TO <target> AS SELECT ...
517+
# into CREATE TABLE <target> AS SELECT ... so sqllineage identifies the
518+
# correct downstream target instead of the view name.
519+
# Handles optional ON CLUSTER and ENGINE/POPULATE/SETTINGS clauses.
518520
if insensitive_match(clean_query, r"^\s*CREATE\s+MATERIALIZED\s+VIEW\s+"):
519521
mv_to_match = re.search(
520-
r"^\s*CREATE\s+MATERIALIZED\s+VIEW\s+(?:IF\s+NOT\s+EXISTS\s+)?(.*?)\s+TO\s+(.*?)\s+AS\s+(SELECT.*)",
522+
r"^\s*CREATE\s+MATERIALIZED\s+VIEW\s+"
523+
r"(?:IF\s+NOT\s+EXISTS\s+)?\S+\s+" # skip MV name
524+
r"(?:ON\s+CLUSTER\s+\S+\s+)?" # optional ON CLUSTER
525+
r"TO\s+([\w`\.\[\]\"]+)" # target table (stop at whitespace)
526+
r".*?AS\s+(SELECT.*)", # skip ENGINE/SETTINGS, capture SELECT
521527
clean_query,
522-
re.IGNORECASE | re.DOTALL
528+
re.IGNORECASE | re.DOTALL,
523529
)
524530
if mv_to_match:
525-
target_table = mv_to_match.group(2)
526-
select_query = mv_to_match.group(3)
531+
target_table = mv_to_match.group(1).strip()
532+
select_query = mv_to_match.group(2)
527533
clean_query = f"CREATE TABLE {target_table} AS {select_query}"
528534

529535
if insensitive_match(

ingestion/tests/unit/lineage/test_sql_lineage.py

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -459,3 +459,105 @@ def test_stage_lineage_with_file_format_options(self):
459459
len(parser.target_tables) > 0,
460460
f"Expected target tables for query: {query}",
461461
)
462+
463+
464+
class ClickhouseMaterializedViewLineageTest(TestCase):
465+
"""
466+
Tests for ClickHouse MATERIALIZED VIEW TO <target> lineage rewriting
467+
in LineageParser.clean_raw_query.
468+
"""
469+
470+
def test_simple_materialized_view_to(self):
471+
"""
472+
Basic MATERIALIZED VIEW ... TO target AS SELECT is rewritten so
473+
the target table — not the MV — appears as the write table.
474+
"""
475+
query = (
476+
"CREATE MATERIALIZED VIEW default.my_mv "
477+
"TO default.my_target "
478+
"AS SELECT * FROM default.my_source"
479+
)
480+
cleaned = LineageParser.clean_raw_query(query)
481+
self.assertIsNotNone(cleaned)
482+
self.assertIn("CREATE TABLE default.my_target", cleaned)
483+
self.assertNotIn("my_mv", cleaned)
484+
485+
def test_materialized_view_with_engine_clause(self):
486+
"""
487+
ENGINE clause between TO and AS must not be captured as the target name.
488+
"""
489+
query = (
490+
"CREATE MATERIALIZED VIEW db.mv_name "
491+
"TO db.target_table "
492+
"ENGINE = MergeTree() "
493+
"AS SELECT id, name FROM db.source_table"
494+
)
495+
cleaned = LineageParser.clean_raw_query(query)
496+
self.assertIsNotNone(cleaned)
497+
self.assertIn("CREATE TABLE db.target_table", cleaned)
498+
self.assertNotIn("ENGINE", cleaned.split("AS SELECT")[0])
499+
500+
def test_materialized_view_if_not_exists(self):
501+
"""
502+
IF NOT EXISTS variant should still resolve the correct target table.
503+
"""
504+
query = (
505+
"CREATE MATERIALIZED VIEW IF NOT EXISTS analytics.mv_clicks "
506+
"TO analytics.clicks_agg "
507+
"AS SELECT page, count(*) as cnt FROM analytics.raw_clicks GROUP BY page"
508+
)
509+
cleaned = LineageParser.clean_raw_query(query)
510+
self.assertIsNotNone(cleaned)
511+
self.assertIn("CREATE TABLE analytics.clicks_agg", cleaned)
512+
513+
def test_materialized_view_on_cluster(self):
514+
"""
515+
ON CLUSTER clause should be skipped and not interfere with TO parsing.
516+
"""
517+
query = (
518+
"CREATE MATERIALIZED VIEW cluster_db.mv_events "
519+
"ON CLUSTER my_cluster "
520+
"TO cluster_db.events_summary "
521+
"AS SELECT event_type, count() FROM cluster_db.events GROUP BY event_type"
522+
)
523+
cleaned = LineageParser.clean_raw_query(query)
524+
self.assertIsNotNone(cleaned)
525+
self.assertIn("CREATE TABLE cluster_db.events_summary", cleaned)
526+
527+
def test_materialized_view_without_to_clause_unchanged(self):
528+
"""
529+
A MATERIALIZED VIEW without a TO clause should not be transformed —
530+
the query is returned as-is (or filtered by the caller).
531+
"""
532+
query = (
533+
"CREATE MATERIALIZED VIEW my_mv AS SELECT * FROM source_table"
534+
)
535+
cleaned = LineageParser.clean_raw_query(query)
536+
# Without TO, no transformation; the query should pass through unchanged
537+
self.assertIsNotNone(cleaned)
538+
self.assertNotIn("CREATE TABLE", cleaned)
539+
540+
def test_clean_query_produces_correct_lineage(self):
541+
"""
542+
End-to-end: after clean_raw_query the LineageParser should identify
543+
the TO target as the write table, not the MV name.
544+
"""
545+
query = (
546+
"CREATE MATERIALIZED VIEW default.mv_orders "
547+
"TO default.orders_agg "
548+
"AS SELECT customer_id, sum(total) FROM default.orders GROUP BY customer_id"
549+
)
550+
cleaned = LineageParser.clean_raw_query(query)
551+
self.assertIsNotNone(cleaned)
552+
parser = LineageParser(cleaned)
553+
target_names = [str(t) for t in parser.target_tables]
554+
source_names = [str(t) for t in parser.source_tables]
555+
self.assertTrue(
556+
any("orders_agg" in t for t in target_names),
557+
f"Expected orders_agg in targets, got: {target_names}",
558+
)
559+
self.assertTrue(
560+
any("orders" in s for s in source_names),
561+
f"Expected orders in sources, got: {source_names}",
562+
)
563+

0 commit comments

Comments
 (0)