Skip to content

Commit e89a464

Browse files
author
Joseph Thomas Stalin
committed
fix(clickhouse): add downstream lineage for MATERIALIZED VIEW TO clause
1 parent 1c8b300 commit e89a464

2 files changed

Lines changed: 259 additions & 1 deletion

File tree

ingestion/src/metadata/ingestion/lineage/parser.py

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,9 @@
1111
"""
1212
Lineage Parser configuration
1313
"""
14+
1415
import hashlib
16+
import re
1517
import time
1618
import traceback
1719
from collections import defaultdict
@@ -64,6 +66,31 @@
6466
# max memory in MB that lineage parsing can consume
6567
LINEAGE_PARSING_MEMORY_LIMIT_MB = 100
6668

69+
# Pre-compiled regex to rewrite ClickHouse MATERIALIZED VIEW ... TO <target> queries
70+
# into CREATE TABLE <target> AS SELECT ... so that sqllineage can correctly identify
71+
# the downstream target table instead of the view name itself.
72+
#
73+
# Handles all documented ClickHouse CREATE MATERIALIZED VIEW forms:
74+
# 1. CREATE MATERIALIZED VIEW [IF NOT EXISTS] mv_name [ON CLUSTER c] TO target AS SELECT ...
75+
# 2. CREATE MATERIALIZED VIEW mv_name REFRESH EVERY n HOUR [OFFSET m MINUTE] TO target (col1, col2)
76+
# [DEFINER = user] [SQL SECURITY ...] AS SELECT ...
77+
# Also: REFRESH AFTER n SECOND form (alternative ClickHouse refresh syntax)
78+
# 3. ENGINE = ... clauses between TO target and AS SELECT are skipped.
79+
#
80+
# The character class for <target> handles backtick-quoted segments with spaces
81+
# and stops at the first whitespace / opening paren NOT inside backticks.
82+
_CLICKHOUSE_MV_TO_RE = re.compile(
83+
r"^\s*CREATE\s+MATERIALIZED\s+VIEW\s+"
84+
r"(?:IF\s+NOT\s+EXISTS\s+)?" # optional IF NOT EXISTS
85+
r"(?:`[^`]+`|\S+)\s+" # skip MV name (handles quoted names with spaces)
86+
r"(?:ON\s+CLUSTER\s+\S+\s+)?" # optional ON CLUSTER <cluster_name>
87+
r"(?:REFRESH\s+(?:EVERY|AFTER)\s+(?:(?!\bTO\b)[\s\S])*?)?" # optional REFRESH
88+
r"TO\s+((?:`[^`]+`|[\w`\.\[\]\"])+)" # capture target table
89+
r"(?:\s*\([^)]*\))?" # optional column list (col1, col2, ...)
90+
r".*?AS\s+(SELECT.*)", # skip ENGINE/DEFINER/SETTINGS, capture SELECT body
91+
re.IGNORECASE | re.DOTALL,
92+
)
93+
6794

6895
class LineageParser:
6996
"""
@@ -234,7 +261,7 @@ def table_aliases(self) -> Dict[str, str]:
234261
# Check if involved_tables is present
235262
if not self.involved_tables:
236263
logger.debug(
237-
f"[{self.query_hash}] [UsageSink] No involved tables found alias map will be empty."
264+
f"[{self.query_hash}] [UsageSink] No involved tables found -- alias map will be empty."
238265
)
239266
return {}
240267

@@ -512,6 +539,22 @@ def clean_raw_query(cls, raw_query: str) -> Optional[str]:
512539

513540
clean_query = clean_query.replace("\\n", "\n")
514541

542+
# Rewrite ClickHouse MATERIALIZED VIEW ... TO <target> AS SELECT ...
543+
# into CREATE TABLE <target> AS SELECT ... so that sqllineage correctly
544+
# identifies the downstream target table instead of the view name.
545+
#
546+
# Without this rewrite, sqllineage treats the MV name as the CREATE target
547+
# and never registers the table named after TO as a downstream node.
548+
# We handle it at this layer (query normalisation) so all three parsers
549+
# (SqlGlot, SqlFluff, SqlParse) benefit automatically and no synthetic
550+
# queries are written to query history.
551+
if insensitive_match(clean_query, r"^\s*CREATE\s+MATERIALIZED\s+VIEW\s+"):
552+
mv_to_match = _CLICKHOUSE_MV_TO_RE.search(clean_query)
553+
if mv_to_match:
554+
target_table = mv_to_match.group(1).strip()
555+
select_body = mv_to_match.group(2)
556+
clean_query = f"CREATE TABLE {target_table} AS {select_body}"
557+
515558
if insensitive_match(
516559
clean_query, r"\s*/\*.*?\*/\s*merge.*into.*?when matched.*?"
517560
):

ingestion/tests/unit/lineage/test_sql_lineage.py

Lines changed: 215 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
"""
1313
sql lineage utils tests
1414
"""
15+
1516
import uuid
1617
from unittest import TestCase
1718

@@ -459,3 +460,217 @@ def test_stage_lineage_with_file_format_options(self):
459460
len(parser.target_tables) > 0,
460461
f"Expected target tables for query: {query}",
461462
)
463+
464+
465+
class ClickhouseMaterializedViewLineageTest(TestCase):
466+
"""
467+
Tests for ClickHouse MATERIALIZED VIEW TO <target> lineage rewriting
468+
in LineageParser.clean_raw_query.
469+
"""
470+
471+
def test_simple_materialized_view_to(self):
472+
"""
473+
Basic MATERIALIZED VIEW ... TO target AS SELECT is rewritten so
474+
the target table -- not the MV -- appears as the write table.
475+
"""
476+
query = (
477+
"CREATE MATERIALIZED VIEW default.my_mv "
478+
"TO default.my_target "
479+
"AS SELECT * FROM default.my_source"
480+
)
481+
cleaned = LineageParser.clean_raw_query(query)
482+
self.assertIsNotNone(cleaned)
483+
self.assertIn("CREATE TABLE default.my_target", cleaned)
484+
self.assertNotIn("my_mv", cleaned)
485+
486+
def test_materialized_view_with_engine_clause(self):
487+
"""
488+
ENGINE clause between TO and AS must not be captured as the target name.
489+
"""
490+
query = (
491+
"CREATE MATERIALIZED VIEW db.mv_name "
492+
"TO db.target_table "
493+
"ENGINE = MergeTree() "
494+
"AS SELECT id, name FROM db.source_table"
495+
)
496+
cleaned = LineageParser.clean_raw_query(query)
497+
self.assertIsNotNone(cleaned)
498+
self.assertIn("CREATE TABLE db.target_table", cleaned)
499+
self.assertNotIn("ENGINE", cleaned.split("AS SELECT")[0])
500+
501+
def test_materialized_view_if_not_exists(self):
502+
"""
503+
IF NOT EXISTS variant should still resolve the correct target table.
504+
"""
505+
query = (
506+
"CREATE MATERIALIZED VIEW IF NOT EXISTS analytics.mv_clicks "
507+
"TO analytics.clicks_agg "
508+
"AS SELECT page, count(*) as cnt FROM analytics.raw_clicks GROUP BY page"
509+
)
510+
cleaned = LineageParser.clean_raw_query(query)
511+
self.assertIsNotNone(cleaned)
512+
self.assertIn("CREATE TABLE analytics.clicks_agg", cleaned)
513+
514+
def test_materialized_view_on_cluster(self):
515+
"""
516+
ON CLUSTER clause should be skipped and not interfere with TO parsing.
517+
"""
518+
query = (
519+
"CREATE MATERIALIZED VIEW cluster_db.mv_events "
520+
"ON CLUSTER my_cluster "
521+
"TO cluster_db.events_summary "
522+
"AS SELECT event_type, count() FROM cluster_db.events GROUP BY event_type"
523+
)
524+
cleaned = LineageParser.clean_raw_query(query)
525+
self.assertIsNotNone(cleaned)
526+
self.assertIn("CREATE TABLE cluster_db.events_summary", cleaned)
527+
528+
def test_materialized_view_without_to_clause_unchanged(self):
529+
"""
530+
A MATERIALIZED VIEW without a TO clause should not be transformed --
531+
the query is returned as-is (or filtered by the caller).
532+
"""
533+
query = "CREATE MATERIALIZED VIEW my_mv AS SELECT * FROM source_table"
534+
cleaned = LineageParser.clean_raw_query(query)
535+
# Without TO, no transformation; the query should pass through unchanged
536+
self.assertIsNotNone(cleaned)
537+
self.assertNotIn("CREATE TABLE", cleaned)
538+
539+
def test_clean_query_produces_correct_lineage(self):
540+
"""
541+
End-to-end: after clean_raw_query the LineageParser should identify
542+
the TO target as the write table, not the MV name.
543+
"""
544+
query = (
545+
"CREATE MATERIALIZED VIEW default.mv_orders "
546+
"TO default.orders_agg "
547+
"AS SELECT customer_id, sum(total) FROM default.orders GROUP BY customer_id"
548+
)
549+
cleaned = LineageParser.clean_raw_query(query)
550+
self.assertIsNotNone(cleaned)
551+
parser = LineageParser(cleaned)
552+
target_names = [str(t) for t in parser.target_tables]
553+
source_names = [str(t) for t in parser.source_tables]
554+
self.assertTrue(
555+
any("orders_agg" in t for t in target_names),
556+
f"Expected orders_agg in targets, got: {target_names}",
557+
)
558+
self.assertTrue(
559+
any("orders" in s for s in source_names),
560+
f"Expected orders in sources, got: {source_names}",
561+
)
562+
563+
def test_materialized_view_refresh_every_with_definer(self):
564+
"""
565+
REFRESH EVERY ... TO <target> (col_list) DEFINER = <user> SQL SECURITY DEFINER
566+
AS SELECT ... -- the second exact variant from issue #26265.
567+
568+
The REFRESH EVERY clause, the column list between TO and AS, and the
569+
DEFINER / SQL SECURITY tokens must all be swallowed without polluting
570+
the captured target table name or SELECT body.
571+
"""
572+
query = (
573+
"CREATE MATERIALIZED VIEW schema01.samples_mv "
574+
"REFRESH EVERY 3 HOUR "
575+
"TO schema02.samples_e "
576+
"(column_01, column_02) "
577+
"DEFINER = myuser SQL SECURITY DEFINER "
578+
"AS SELECT * FROM schema01.samples"
579+
)
580+
cleaned = LineageParser.clean_raw_query(query)
581+
self.assertIsNotNone(cleaned)
582+
# Target must be the TO table, not the MV name
583+
self.assertIn("CREATE TABLE schema02.samples_e", cleaned)
584+
self.assertNotIn("samples_mv", cleaned)
585+
# The SELECT body must be intact
586+
self.assertIn("SELECT * FROM schema01.samples", cleaned)
587+
# DEFINER / REFRESH tokens must not appear before SELECT
588+
prefix = cleaned.split("SELECT")[0].upper()
589+
self.assertNotIn("DEFINER", prefix)
590+
self.assertNotIn("REFRESH", prefix)
591+
592+
def test_materialized_view_with_column_list(self):
593+
"""
594+
Column list (col1, col2) between the TO target and AS SELECT must be
595+
discarded -- it must not be captured as part of the target table name.
596+
Covers the first exact query from issue #26265:
597+
598+
CREATE MATERIALIZED VIEW <schema-01>.samples_mv
599+
TO <schema-02>.samples_e (column_01, column_02)
600+
FROM <schema-01>.samples
601+
"""
602+
query = (
603+
"CREATE MATERIALIZED VIEW schema01.samples_mv "
604+
"TO schema02.samples_e "
605+
"(column_01, column_02) "
606+
"AS SELECT column_01, column_02 FROM schema01.samples"
607+
)
608+
cleaned = LineageParser.clean_raw_query(query)
609+
self.assertIsNotNone(cleaned)
610+
self.assertIn("CREATE TABLE schema02.samples_e", cleaned)
611+
# Column list must not bleed into the table name
612+
self.assertNotIn("(column_01", cleaned.split("AS SELECT")[0])
613+
614+
def test_materialized_view_refresh_every(self):
615+
"""
616+
REFRESH EVERY clause should be handled and the target table correctly identified.
617+
"""
618+
query = (
619+
"CREATE MATERIALIZED VIEW default.mv_refresh "
620+
"REFRESH EVERY INTERVAL 1 MINUTE "
621+
"TO default.target_refresh "
622+
"AS SELECT * FROM default.source_refresh"
623+
)
624+
cleaned = LineageParser.clean_raw_query(query)
625+
self.assertIsNotNone(cleaned)
626+
self.assertIn("CREATE TABLE default.target_refresh", cleaned)
627+
self.assertIn("AS SELECT * FROM default.source_refresh", cleaned)
628+
629+
def test_materialized_view_refresh_after(self):
630+
"""
631+
REFRESH AFTER form (alternative ClickHouse refresh syntax) should be handled.
632+
This addresses the gitar-bot edge-case comment: REFRESH AFTER was not matched
633+
by the previous REFRESH EVERY-only pattern.
634+
"""
635+
query = (
636+
"CREATE MATERIALIZED VIEW default.mv_after "
637+
"REFRESH AFTER 30 SECOND "
638+
"TO default.target_after "
639+
"AS SELECT id, ts FROM default.events"
640+
)
641+
cleaned = LineageParser.clean_raw_query(query)
642+
self.assertIsNotNone(cleaned)
643+
self.assertIn("CREATE TABLE default.target_after", cleaned)
644+
self.assertNotIn("mv_after", cleaned)
645+
self.assertIn("AS SELECT id, ts FROM default.events", cleaned)
646+
647+
def test_materialized_view_refresh_every_with_offset(self):
648+
"""
649+
REFRESH EVERY n HOUR OFFSET m MINUTE - complex multi-token interval form.
650+
"""
651+
query = (
652+
"CREATE MATERIALIZED VIEW default.mv_offset "
653+
"REFRESH EVERY 1 DAY OFFSET 30 MINUTE "
654+
"TO default.target_offset "
655+
"AS SELECT * FROM default.source_offset"
656+
)
657+
cleaned = LineageParser.clean_raw_query(query)
658+
self.assertIsNotNone(cleaned)
659+
self.assertIn("CREATE TABLE default.target_offset", cleaned)
660+
self.assertNotIn("OFFSET", cleaned.split("AS SELECT")[0].upper())
661+
662+
def test_materialized_view_quoted_with_spaces(self):
663+
"""
664+
Verify that quoted identifiers containing spaces (e.g. `db name`.`table name`)
665+
are correctly captured by the regex and not truncated at the space.
666+
"""
667+
query = (
668+
"CREATE MATERIALIZED VIEW `my mv` "
669+
"TO `my schema`.`my table` "
670+
"AS SELECT * FROM `source schema`.`source table`"
671+
)
672+
cleaned = LineageParser.clean_raw_query(query)
673+
self.assertIsNotNone(cleaned)
674+
# Should NOT be truncated to "CREATE TABLE `my"
675+
self.assertIn("CREATE TABLE `my schema`.`my table` AS SELECT", cleaned)
676+
self.assertIn("FROM `source schema`.`source table`", cleaned)

0 commit comments

Comments
 (0)