Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,8 @@ def cache_lineage(self):
{
"source_table_full_name": row.source_table_full_name,
"target_table_full_name": row.target_table_full_name,
"source_path": getattr(row, "source_path", None),
"target_path": getattr(row, "target_path", None),
}
)
except Exception as exc:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,40 +87,50 @@

DATABRICKS_DDL = "SHOW CREATE TABLE `{table_name}`"

DATABRICKS_GET_TABLE_LINEAGE = """
SELECT
entity_id,
source_table_full_name,
target_table_full_name
FROM system.access.table_lineage
WHERE entity_type IN ('JOB', 'PIPELINE')
AND event_time >= current_date() - INTERVAL {lookback_days} DAYS
AND source_table_full_name IS NOT NULL
AND target_table_full_name IS NOT NULL
GROUP BY entity_id, source_table_full_name, target_table_full_name
"""
DATABRICKS_GET_TABLE_LINEAGE = textwrap.dedent(
"""
SELECT
entity_id,
source_table_full_name,
target_table_full_name,
source_path,
target_path
FROM system.access.table_lineage
WHERE entity_type IN ('JOB', 'PIPELINE')
AND event_time >= current_date() - INTERVAL {lookback_days} DAYS
AND source_table_full_name IS NOT NULL
AND target_table_full_name IS NOT NULL
GROUP BY entity_id, source_table_full_name, target_table_full_name, source_path, target_path
"""
)

DATABRICKS_GET_COLUMN_LINEAGE = """
SELECT
entity_id,
source_table_full_name,
source_column_name,
target_table_full_name,
target_column_name
FROM system.access.column_lineage
WHERE entity_type IN ('JOB', 'PIPELINE')
AND event_time >= current_date() - INTERVAL {lookback_days} DAYS
AND source_table_full_name IS NOT NULL
AND target_table_full_name IS NOT NULL
AND source_column_name IS NOT NULL
AND target_column_name IS NOT NULL
GROUP BY
entity_id,
source_table_full_name,
source_column_name,
target_table_full_name,
target_column_name
"""
DATABRICKS_GET_COLUMN_LINEAGE = textwrap.dedent(
"""
SELECT
entity_id,
source_table_full_name,
source_column_name,
target_table_full_name,
target_column_name,
source_path,
target_path
FROM system.access.column_lineage
WHERE entity_type IN ('JOB', 'PIPELINE')
AND event_time >= current_date() - INTERVAL {lookback_days} DAYS
AND source_table_full_name IS NOT NULL
AND target_table_full_name IS NOT NULL
AND source_column_name IS NOT NULL
AND target_column_name IS NOT NULL
GROUP BY
entity_id,
source_table_full_name,
source_column_name,
target_table_full_name,
target_column_name,
source_path,
target_path
"""
)

# Test connection queries
TEST_VIEW_DEFINITIONS = textwrap.dedent(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
tuple[str, str], list[tuple[str, str]]
] = defaultdict(list)
self.external_location_map: dict[str, str] = {}
self.external_path_to_fqn: dict[str, str] = {}
self.test_connection()

def close(self):
Expand All @@ -105,7 +106,7 @@
)
return cls(config, metadata)

def _cache_lineage(self):

Check failure on line 109 in ingestion/src/metadata/ingestion/source/database/unitycatalog/lineage.py

View check run for this annotation

SonarQubeCloud / [open-metadata-ingestion] SonarCloud Code Analysis

Refactor this function to reduce its Cognitive Complexity from 24 to the 15 allowed.

See more on https://sonarcloud.io/project/issues?id=open-metadata-ingestion&issues=AZ3E9hSD8nV5pkv_G2bZ&open=AZ3E9hSD8nV5pkv_G2bZ&pullRequest=27648
"""
Bulk-fetch all table and column lineage from system tables into memory.
"""
Expand All @@ -124,9 +125,26 @@
)
)
for row in rows:
self.table_lineage_map[row.target_table_full_name].add(
row.source_table_full_name
)
source = row.source_table_full_name
target = row.target_table_full_name

if not source and row.source_path:
source = self.external_path_to_fqn.get(
row.source_path.rstrip("/")
)
if not target and row.target_path:
target = self.external_path_to_fqn.get(
row.target_path.rstrip("/")
)

if source and target:
self.table_lineage_map[target].add(source)
else:
logger.debug(
f"Skipping unresolvable lineage row: "
f"source_path={getattr(row, 'source_path', None)}, "
f"target_path={getattr(row, 'target_path', None)}"
)
logger.info(
f"Cached table lineage: {sum(len(v) for v in self.table_lineage_map.values())} edges "
f"for {len(self.table_lineage_map)} target tables"
Expand All @@ -145,13 +163,22 @@
)
)
for row in rows:
table_key = (
row.source_table_full_name,
row.target_table_full_name,
source = row.source_table_full_name or (
self.external_path_to_fqn.get(
row.source_path.rstrip("/")
) if row.source_path else ""
)
self.column_lineage_map[table_key].append(
(row.source_column_name, row.target_column_name)
target = row.target_table_full_name or (
self.external_path_to_fqn.get(
row.target_path.rstrip("/")
) if row.target_path else ""
)

if source and target:
table_key = (source, target)
self.column_lineage_map[table_key].append(
(row.source_column_name, row.target_column_name)
)
logger.info(
f"Cached column lineage: {sum(len(v) for v in self.column_lineage_map.values())} "
f"column mappings for {len(self.column_lineage_map)} table pairs"
Expand All @@ -173,6 +200,9 @@
f"{row.table_catalog}.{row.table_schema}.{row.table_name}"
)
self.external_location_map[table_fqn] = row.storage_path
self.external_path_to_fqn = {
v.rstrip("/"): k for k, v in self.external_location_map.items()
}
logger.info(
f"Cached {len(self.external_location_map)} external table locations"
)
Expand Down Expand Up @@ -362,8 +392,8 @@
Fetch lineage from system tables for both table-to-table
and external location lineage.
"""
self._cache_lineage()
self._cache_external_locations()
self._cache_lineage()

for database in self.metadata.list_all_entities(
entity=Database, params={"service": self.config.serviceName}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,14 @@
"""
SELECT
source_table_full_name,
target_table_full_name
target_table_full_name,
source_path,
target_path
FROM system.access.table_lineage
WHERE event_time >= current_date() - INTERVAL {query_log_duration} DAYS
AND source_table_full_name IS NOT NULL
AND target_table_full_name IS NOT NULL
GROUP BY source_table_full_name, target_table_full_name
AND (source_table_full_name IS NOT NULL OR source_path IS NOT NULL)
AND (target_table_full_name IS NOT NULL OR target_path IS NOT NULL)
GROUP BY source_table_full_name, target_table_full_name, source_path, target_path
"""
)

Expand All @@ -75,18 +77,22 @@
source_table_full_name,
source_column_name,
target_table_full_name,
target_column_name
target_column_name,
source_path,
target_path
FROM system.access.column_lineage
WHERE event_time >= current_date() - INTERVAL {query_log_duration} DAYS
AND source_table_full_name IS NOT NULL
AND target_table_full_name IS NOT NULL
AND (source_table_full_name IS NOT NULL OR source_path IS NOT NULL)
AND (target_table_full_name IS NOT NULL OR target_path IS NOT NULL)
AND source_column_name IS NOT NULL
AND target_column_name IS NOT NULL
GROUP BY
source_table_full_name,
source_column_name,
target_table_full_name,
target_column_name
target_column_name,
source_path,
target_path
"""
)

Expand Down
Loading
Loading