@@ -459,3 +459,105 @@ def test_stage_lineage_with_file_format_options(self):
459459 len (parser .target_tables ) > 0 ,
460460 f"Expected target tables for query: { query } " ,
461461 )
462+
463+
464+ class ClickhouseMaterializedViewLineageTest (TestCase ):
465+ """
466+ Tests for ClickHouse MATERIALIZED VIEW TO <target> lineage rewriting
467+ in LineageParser.clean_raw_query.
468+ """
469+
470+ def test_simple_materialized_view_to (self ):
471+ """
472+ Basic MATERIALIZED VIEW ... TO target AS SELECT is rewritten so
473+ the target table — not the MV — appears as the write table.
474+ """
475+ query = (
476+ "CREATE MATERIALIZED VIEW default.my_mv "
477+ "TO default.my_target "
478+ "AS SELECT * FROM default.my_source"
479+ )
480+ cleaned = LineageParser .clean_raw_query (query )
481+ self .assertIsNotNone (cleaned )
482+ self .assertIn ("CREATE TABLE default.my_target" , cleaned )
483+ self .assertNotIn ("my_mv" , cleaned )
484+
485+ def test_materialized_view_with_engine_clause (self ):
486+ """
487+ ENGINE clause between TO and AS must not be captured as the target name.
488+ """
489+ query = (
490+ "CREATE MATERIALIZED VIEW db.mv_name "
491+ "TO db.target_table "
492+ "ENGINE = MergeTree() "
493+ "AS SELECT id, name FROM db.source_table"
494+ )
495+ cleaned = LineageParser .clean_raw_query (query )
496+ self .assertIsNotNone (cleaned )
497+ self .assertIn ("CREATE TABLE db.target_table" , cleaned )
498+ self .assertNotIn ("ENGINE" , cleaned .split ("AS SELECT" )[0 ])
499+
500+ def test_materialized_view_if_not_exists (self ):
501+ """
502+ IF NOT EXISTS variant should still resolve the correct target table.
503+ """
504+ query = (
505+ "CREATE MATERIALIZED VIEW IF NOT EXISTS analytics.mv_clicks "
506+ "TO analytics.clicks_agg "
507+ "AS SELECT page, count(*) as cnt FROM analytics.raw_clicks GROUP BY page"
508+ )
509+ cleaned = LineageParser .clean_raw_query (query )
510+ self .assertIsNotNone (cleaned )
511+ self .assertIn ("CREATE TABLE analytics.clicks_agg" , cleaned )
512+
513+ def test_materialized_view_on_cluster (self ):
514+ """
515+ ON CLUSTER clause should be skipped and not interfere with TO parsing.
516+ """
517+ query = (
518+ "CREATE MATERIALIZED VIEW cluster_db.mv_events "
519+ "ON CLUSTER my_cluster "
520+ "TO cluster_db.events_summary "
521+ "AS SELECT event_type, count() FROM cluster_db.events GROUP BY event_type"
522+ )
523+ cleaned = LineageParser .clean_raw_query (query )
524+ self .assertIsNotNone (cleaned )
525+ self .assertIn ("CREATE TABLE cluster_db.events_summary" , cleaned )
526+
527+ def test_materialized_view_without_to_clause_unchanged (self ):
528+ """
529+ A MATERIALIZED VIEW without a TO clause should not be transformed —
530+ the query is returned as-is (or filtered by the caller).
531+ """
532+ query = (
533+ "CREATE MATERIALIZED VIEW my_mv AS SELECT * FROM source_table"
534+ )
535+ cleaned = LineageParser .clean_raw_query (query )
536+ # Without TO, no transformation; the query should pass through unchanged
537+ self .assertIsNotNone (cleaned )
538+ self .assertNotIn ("CREATE TABLE" , cleaned )
539+
540+ def test_clean_query_produces_correct_lineage (self ):
541+ """
542+ End-to-end: after clean_raw_query the LineageParser should identify
543+ the TO target as the write table, not the MV name.
544+ """
545+ query = (
546+ "CREATE MATERIALIZED VIEW default.mv_orders "
547+ "TO default.orders_agg "
548+ "AS SELECT customer_id, sum(total) FROM default.orders GROUP BY customer_id"
549+ )
550+ cleaned = LineageParser .clean_raw_query (query )
551+ self .assertIsNotNone (cleaned )
552+ parser = LineageParser (cleaned )
553+ target_names = [str (t ) for t in parser .target_tables ]
554+ source_names = [str (t ) for t in parser .source_tables ]
555+ self .assertTrue (
556+ any ("orders_agg" in t for t in target_names ),
557+ f"Expected orders_agg in targets, got: { target_names } " ,
558+ )
559+ self .assertTrue (
560+ any ("orders" in s for s in source_names ),
561+ f"Expected orders in sources, got: { source_names } " ,
562+ )
563+
0 commit comments