diff --git a/ingestion/src/metadata/ingestion/source/database/databricks/client.py b/ingestion/src/metadata/ingestion/source/database/databricks/client.py index 8577b477d991..12c6812d6321 100644 --- a/ingestion/src/metadata/ingestion/source/database/databricks/client.py +++ b/ingestion/src/metadata/ingestion/source/database/databricks/client.py @@ -350,6 +350,8 @@ def cache_lineage(self): { "source_table_full_name": row.source_table_full_name, "target_table_full_name": row.target_table_full_name, + "source_path": getattr(row, "source_path", None), + "target_path": getattr(row, "target_path", None), } ) except Exception as exc: diff --git a/ingestion/src/metadata/ingestion/source/database/databricks/queries.py b/ingestion/src/metadata/ingestion/source/database/databricks/queries.py index d421ea4b9a11..53ffca8b836a 100644 --- a/ingestion/src/metadata/ingestion/source/database/databricks/queries.py +++ b/ingestion/src/metadata/ingestion/source/database/databricks/queries.py @@ -87,40 +87,50 @@ DATABRICKS_DDL = "SHOW CREATE TABLE `{table_name}`" -DATABRICKS_GET_TABLE_LINEAGE = """ -SELECT - entity_id, - source_table_full_name, - target_table_full_name -FROM system.access.table_lineage -WHERE entity_type IN ('JOB', 'PIPELINE') - AND event_time >= current_date() - INTERVAL {lookback_days} DAYS - AND source_table_full_name IS NOT NULL - AND target_table_full_name IS NOT NULL -GROUP BY entity_id, source_table_full_name, target_table_full_name -""" +DATABRICKS_GET_TABLE_LINEAGE = textwrap.dedent( + """ + SELECT + entity_id, + source_table_full_name, + target_table_full_name, + source_path, + target_path + FROM system.access.table_lineage + WHERE entity_type IN ('JOB', 'PIPELINE') + AND event_time >= current_date() - INTERVAL {lookback_days} DAYS + AND source_table_full_name IS NOT NULL + AND target_table_full_name IS NOT NULL + GROUP BY entity_id, source_table_full_name, target_table_full_name, source_path, target_path + """ +) -DATABRICKS_GET_COLUMN_LINEAGE = """ -SELECT - entity_id, - source_table_full_name, - source_column_name, - target_table_full_name, - target_column_name -FROM system.access.column_lineage -WHERE entity_type IN ('JOB', 'PIPELINE') - AND event_time >= current_date() - INTERVAL {lookback_days} DAYS - AND source_table_full_name IS NOT NULL - AND target_table_full_name IS NOT NULL - AND source_column_name IS NOT NULL - AND target_column_name IS NOT NULL -GROUP BY - entity_id, - source_table_full_name, - source_column_name, - target_table_full_name, - target_column_name -""" +DATABRICKS_GET_COLUMN_LINEAGE = textwrap.dedent( + """ + SELECT + entity_id, + source_table_full_name, + source_column_name, + target_table_full_name, + target_column_name, + source_path, + target_path + FROM system.access.column_lineage + WHERE entity_type IN ('JOB', 'PIPELINE') + AND event_time >= current_date() - INTERVAL {lookback_days} DAYS + AND source_table_full_name IS NOT NULL + AND target_table_full_name IS NOT NULL + AND source_column_name IS NOT NULL + AND target_column_name IS NOT NULL + GROUP BY + entity_id, + source_table_full_name, + source_column_name, + target_table_full_name, + target_column_name, + source_path, + target_path + """ +) # Test connection queries TEST_VIEW_DEFINITIONS = textwrap.dedent( diff --git a/ingestion/src/metadata/ingestion/source/database/unitycatalog/lineage.py b/ingestion/src/metadata/ingestion/source/database/unitycatalog/lineage.py index 8aaace0986f2..4444dde7b7ba 100644 --- a/ingestion/src/metadata/ingestion/source/database/unitycatalog/lineage.py +++ b/ingestion/src/metadata/ingestion/source/database/unitycatalog/lineage.py @@ -80,6 +80,7 @@ def __init__( tuple[str, str], list[tuple[str, str]] ] = defaultdict(list) self.external_location_map: dict[str, str] = {} + self.external_path_to_fqn: dict[str, str] = {} self.test_connection() def close(self): @@ -124,9 +125,26 @@ def _cache_lineage(self): ) ) for row in rows: - self.table_lineage_map[row.target_table_full_name].add( - row.source_table_full_name - ) + source = row.source_table_full_name + target = row.target_table_full_name + + if not source and row.source_path: + source = self.external_path_to_fqn.get( + row.source_path.rstrip("/") + ) + if not target and row.target_path: + target = self.external_path_to_fqn.get( + row.target_path.rstrip("/") + ) + + if source and target: + self.table_lineage_map[target].add(source) + else: + logger.debug( + f"Skipping unresolvable lineage row: " + f"source_path={getattr(row, 'source_path', None)}, " + f"target_path={getattr(row, 'target_path', None)}" + ) logger.info( f"Cached table lineage: {sum(len(v) for v in self.table_lineage_map.values())} edges " f"for {len(self.table_lineage_map)} target tables" @@ -145,13 +163,22 @@ def _cache_lineage(self): ) ) for row in rows: - table_key = ( - row.source_table_full_name, - row.target_table_full_name, + source = row.source_table_full_name or ( + self.external_path_to_fqn.get( + row.source_path.rstrip("/") + ) if row.source_path else "" ) - self.column_lineage_map[table_key].append( - (row.source_column_name, row.target_column_name) + target = row.target_table_full_name or ( + self.external_path_to_fqn.get( + row.target_path.rstrip("/") + ) if row.target_path else "" ) + + if source and target: + table_key = (source, target) + self.column_lineage_map[table_key].append( + (row.source_column_name, row.target_column_name) + ) logger.info( f"Cached column lineage: {sum(len(v) for v in self.column_lineage_map.values())} " f"column mappings for {len(self.column_lineage_map)} table pairs" @@ -173,6 +200,9 @@ def _cache_external_locations(self): f"{row.table_catalog}.{row.table_schema}.{row.table_name}" ) self.external_location_map[table_fqn] = row.storage_path + self.external_path_to_fqn = { + v.rstrip("/"): k for k, v in self.external_location_map.items() + } logger.info( f"Cached {len(self.external_location_map)} external table locations" ) @@ -362,8 +392,8 @@ def _iter(self, *_, **__) -> Iterable[Either[AddLineageRequest]]: Fetch lineage from system tables for both table-to-table and external location lineage. """ - self._cache_lineage() self._cache_external_locations() + self._cache_lineage() for database in self.metadata.list_all_entities( entity=Database, params={"service": self.config.serviceName} diff --git a/ingestion/src/metadata/ingestion/source/database/unitycatalog/queries.py b/ingestion/src/metadata/ingestion/source/database/unitycatalog/queries.py index 329c59a7d9bb..7d287c197ab0 100644 --- a/ingestion/src/metadata/ingestion/source/database/unitycatalog/queries.py +++ b/ingestion/src/metadata/ingestion/source/database/unitycatalog/queries.py @@ -60,12 +60,14 @@ """ SELECT source_table_full_name, - target_table_full_name + target_table_full_name, + source_path, + target_path FROM system.access.table_lineage WHERE event_time >= current_date() - INTERVAL {query_log_duration} DAYS - AND source_table_full_name IS NOT NULL - AND target_table_full_name IS NOT NULL - GROUP BY source_table_full_name, target_table_full_name + AND (source_table_full_name IS NOT NULL OR source_path IS NOT NULL) + AND (target_table_full_name IS NOT NULL OR target_path IS NOT NULL) + GROUP BY source_table_full_name, target_table_full_name, source_path, target_path """ ) @@ -75,18 +77,22 @@ source_table_full_name, source_column_name, target_table_full_name, - target_column_name + target_column_name, + source_path, + target_path FROM system.access.column_lineage WHERE event_time >= current_date() - INTERVAL {query_log_duration} DAYS - AND source_table_full_name IS NOT NULL - AND target_table_full_name IS NOT NULL + AND (source_table_full_name IS NOT NULL OR source_path IS NOT NULL) + AND (target_table_full_name IS NOT NULL OR target_path IS NOT NULL) AND source_column_name IS NOT NULL AND target_column_name IS NOT NULL GROUP BY source_table_full_name, source_column_name, target_table_full_name, - target_column_name + target_column_name, + source_path, + target_path """ ) diff --git a/ingestion/tests/unit/topology/database/test_unity_catalog_lineage.py b/ingestion/tests/unit/topology/database/test_unity_catalog_lineage.py index 385c56bb00d0..f6b1be2dad63 100644 --- a/ingestion/tests/unit/topology/database/test_unity_catalog_lineage.py +++ b/ingestion/tests/unit/topology/database/test_unity_catalog_lineage.py @@ -92,12 +92,24 @@ def lineage_source(): class TestCacheLineage: def test_cache_table_lineage(self, lineage_source): TableRow = namedtuple( - "TableRow", ["source_table_full_name", "target_table_full_name"] + "TableRow", + [ + "source_table_full_name", + "target_table_full_name", + "source_path", + "target_path", + ], ) mock_rows = [ - TableRow("cat.schema.source1", "cat.schema.target1"), - TableRow("cat.schema.source2", "cat.schema.target1"), - TableRow("cat.schema.source1", "cat.schema.target2"), + TableRow( + "cat.schema.source1", "cat.schema.target1", None, None + ), + TableRow( + "cat.schema.source2", "cat.schema.target1", None, None + ), + TableRow( + "cat.schema.source1", "cat.schema.target2", None, None + ), ] mock_conn = MagicMock() @@ -120,7 +132,13 @@ def test_cache_table_lineage(self, lineage_source): def test_cache_column_lineage(self, lineage_source): TableRow = namedtuple( - "TableRow", ["source_table_full_name", "target_table_full_name"] + "TableRow", + [ + "source_table_full_name", + "target_table_full_name", + "source_path", + "target_path", + ], ) ColumnRow = namedtuple( "ColumnRow", @@ -129,6 +147,8 @@ def test_cache_column_lineage(self, lineage_source): "source_column_name", "target_table_full_name", "target_column_name", + "source_path", + "target_path", ], ) @@ -138,10 +158,28 @@ def mock_execute(query): nonlocal call_count call_count += 1 if call_count == 1: - return [TableRow("cat.schema.src", "cat.schema.tgt")] + return [ + TableRow( + "cat.schema.src", "cat.schema.tgt", None, None + ) + ] return [ - ColumnRow("cat.schema.src", "col_a", "cat.schema.tgt", "col_x"), - ColumnRow("cat.schema.src", "col_b", "cat.schema.tgt", "col_y"), + ColumnRow( + "cat.schema.src", + "col_a", + "cat.schema.tgt", + "col_x", + None, + None, + ), + ColumnRow( + "cat.schema.src", + "col_b", + "cat.schema.tgt", + "col_y", + None, + None, + ), ] mock_conn = MagicMock() @@ -173,6 +211,75 @@ def test_cache_lineage_handles_query_failure(self, lineage_source): assert len(lineage_source.table_lineage_map) == 0 assert len(lineage_source.column_lineage_map) == 0 + def test_cache_table_lineage_resolves_path_to_fqn(self, lineage_source): + lineage_source.external_path_to_fqn = { + "abfss://raw@store.dfs.core.windows.net/external_table": "bronze_ns.deltalake_ns.external_table", + } + + TableRow = namedtuple( + "TableRow", + [ + "source_table_full_name", + "target_table_full_name", + "source_path", + "target_path", + ], + ) + mock_rows = [ + TableRow( + None, + "bronze_ns.deltalake_ns.managed_table", + "abfss://raw@store.dfs.core.windows.net/external_table", + None, + ), + ] + + mock_conn = MagicMock() + mock_conn.execute.return_value = mock_rows + lineage_source.engine.connect.return_value.__enter__ = Mock( + return_value=mock_conn + ) + lineage_source.engine.connect.return_value.__exit__ = Mock(return_value=False) + + lineage_source._cache_lineage() + + assert ( + lineage_source.table_lineage_map["bronze_ns.deltalake_ns.managed_table"] + == {"bronze_ns.deltalake_ns.external_table"} + ) + + def test_cache_table_lineage_skips_unresolvable_path(self, lineage_source): + lineage_source.external_path_to_fqn = {} + + TableRow = namedtuple( + "TableRow", + [ + "source_table_full_name", + "target_table_full_name", + "source_path", + "target_path", + ], + ) + mock_rows = [ + TableRow( + None, + "cat.schema.target", + "abfss://unknown@store.dfs.core.windows.net/ghost", + None, + ), + ] + + mock_conn = MagicMock() + mock_conn.execute.return_value = mock_rows + lineage_source.engine.connect.return_value.__enter__ = Mock( + return_value=mock_conn + ) + lineage_source.engine.connect.return_value.__exit__ = Mock(return_value=False) + + lineage_source._cache_lineage() + + assert "cat.schema.target" not in lineage_source.table_lineage_map + class TestProcessTableLineage: def test_process_table_lineage_from_cache(self, lineage_source): @@ -384,6 +491,33 @@ def test_no_column_lineage_returns_none(self, lineage_source): class TestExternalLocationLineage: + def test_cache_external_locations_builds_reverse_map(self, lineage_source): + ExternalRow = namedtuple( + "ExternalRow", + ["table_catalog", "table_schema", "table_name", "storage_path"], + ) + mock_rows = [ + ExternalRow("cat", "schema", "ext_table1", "s3://bucket/path1"), + ExternalRow("cat", "schema", "ext_table2", "s3://bucket/path2/"), + ] + + mock_conn = MagicMock() + mock_conn.execute.return_value = mock_rows + lineage_source.engine.connect.return_value.__enter__ = Mock( + return_value=mock_conn + ) + lineage_source.engine.connect.return_value.__exit__ = Mock(return_value=False) + + lineage_source._cache_external_locations() + + assert len(lineage_source.external_location_map) == 2 + assert lineage_source.external_location_map["cat.schema.ext_table1"] == "s3://bucket/path1" + assert lineage_source.external_location_map["cat.schema.ext_table2"] == "s3://bucket/path2/" + + assert len(lineage_source.external_path_to_fqn) == 2 + assert lineage_source.external_path_to_fqn["s3://bucket/path1"] == "cat.schema.ext_table1" + assert lineage_source.external_path_to_fqn["s3://bucket/path2"] == "cat.schema.ext_table2" + def test_cache_external_locations(self, lineage_source): ExternalRow = namedtuple( "ExternalRow",