Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -182,55 +182,72 @@ def get_columns(self, connection, table_name, schema=None, **kw):

rows = _get_column_rows(self, connection, table_name, schema, kw.get("db_name"))
result = []
for ordinal_position, (col_name, col_type, _comment) in enumerate(rows):
# Handle both oss hive and Databricks' hive partition header, respectively
if col_name in (
"# Partition Information",
"# Partitioning",
"# Clustering Information",
"# Delta Statistics Columns",
"# Detailed Table Information",
"# Delta Uniform Iceberg",
):
for col_name, col_type, _comment in rows:
# DESCRIBE TABLE EXTENDED emits real columns first, then '#'-prefixed
# section markers (e.g. '# Partition Information', '# Metadata Columns',
# '# Detailed Table Information', '# Constraints'). Spark's v2
# DescribeTableExec can emit markers not in any hardcoded whitelist, so
# treat any '#'-prefixed row or row with empty col_type as end-of-columns.
# ('# col_name' sub-header is filtered upstream in _get_column_rows.)
if col_name.startswith("#") or not col_type:
logger.debug(
f"End of columns for {schema}.{table_name}. Found end-of-columns marker: {col_name}. Stopping column extraction."
)
break
# Take out the more detailed type information
# e.g. 'map<ixnt,int>' -> 'map'
# 'decimal(10,1)' -> decimal
raw_col_type = col_type
col_type = re.search(r"^\w+", col_type).group(0)
try:
coltype = _type_map[col_type]
except KeyError:
util.warn(f"Did not recognize type '{col_type}' of column '{col_name}'")
coltype = types.NullType

col_info = {
"name": col_name,
"type": coltype,
"nullable": True,
"default": None,
"comment": _comment,
"system_data_type": raw_col_type,
"ordinal_position": ordinal_position,
}
if col_type in {"array", "struct", "map"}:
try:
rows = {
r[0]: r[1]
for r in connection.execute(
text(
f"DESCRIBE TABLE `{kw.get('db_name')}`.`{schema}`.`{table_name}` `{col_name}`"
)
).fetchall()
}
col_info["system_data_type"] = rows["data_type"]
col_info["is_complex"] = True
except DatabaseError as err:
logger.error(
f"Failed to fetch column details for column {col_name} in table {table_name} due to: {err}"
# Take out the more detailed type information
# e.g. 'map<int,int>' -> 'map', 'decimal(10,1)' -> 'decimal'
raw_col_type = col_type
type_match = re.search(r"^\w+", col_type)
if type_match is None:
logger.warning(
f"Skipping column '{col_name}' in {schema}.{table_name}: "
f"unparseable col_type '{col_type}'"
)
logger.debug(traceback.format_exc())
result.append(col_info)
continue
Comment on lines +200 to +207
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sold about this. From research it seems that this should never happen. Added it here as a defensive way to continue the column extraction in any case, but I also do not like the possibility of maybe having incomplete column metadata.

Not sure a proper failure and skipping column extraction would be best here.

col_type = type_match.group(0)

try:
coltype = _type_map[col_type]
except KeyError:
util.warn(f"Did not recognize type '{col_type}' of column '{col_name}'")
coltype = types.NullType

col_info = {
"name": col_name,
"type": coltype,
"nullable": True,
"default": None,
"comment": _comment,
"system_data_type": raw_col_type,
"ordinal_position": len(result),
}
if col_type in {"array", "struct", "map"}:
try:
sub_rows = {
r[0]: r[1]
for r in connection.execute(
text(
f"DESCRIBE TABLE `{kw.get('db_name')}`.`{schema}`"
f".`{table_name}` `{col_name}`"
)
).fetchall()
}
col_info["system_data_type"] = sub_rows["data_type"]
col_info["is_complex"] = True
except (DatabaseError, KeyError) as err:
logger.error(
f"Failed to fetch complex-type details for column "
f"{col_name} in table {table_name}: {err}"
)
logger.debug(traceback.format_exc())
result.append(col_info)
except Exception as err: # pylint: disable=broad-except
logger.warning(
f"Skipping column '{col_name}' in {schema}.{table_name} due to "
f"unexpected error: {err}"
)
logger.debug(traceback.format_exc())
return result


Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
# Copyright 2026 Collate
# Licensed under the Collate Community License, Version 1.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Regression tests for the databricks `get_columns` override.

Incident: DESCRIBE TABLE EXTENDED on Unity Catalog / DSv2 tables (streaming,
Iceberg, foreign tables, and others handled by Spark's ``DescribeTableExec``)
emits section markers not present in the connector's historical whitelist —
notably ``# Metadata Columns`` which appears BEFORE ``# Detailed Table
Information``. Upstream ``_get_column_rows`` normalizes the empty col_type
cell on those marker rows to ``None`` and the column-name-only filter lets
them survive. ``get_columns`` then called ``re.search(r"^\\w+", col_type)``
on ``None`` and raised ``TypeError: expected string or bytes-like object``.
``sql_column_handler`` swallowed the exception and returned zero columns,
which the topology runner treated as "no change" — silently dropping column
metadata for every affected table.

The fix generalizes the loop's end-of-columns detection: any row whose
col_name starts with ``#`` or whose col_type is empty terminates the columns
block. This matches Spark's emission order for both v1 (``DescribeTableCommand``)
and v2 (``DescribeTableExec``) paths without hardcoding a marker list.
"""

from unittest.mock import Mock, patch

import pytest

from metadata.ingestion.source.database.databricks.metadata import get_columns


@patch("metadata.ingestion.source.database.databricks.metadata._get_column_rows")
class TestDatabricksGetColumnsSectionBoundary:
"""End-of-columns detection: generic '#'-prefix and empty col_type break."""

def setup_method(self):
self.mock_self = Mock()
self.mock_connection = Mock()

def _run(self):
return get_columns(
self.mock_self,
self.mock_connection,
"t",
"s",
db_name="db",
)

def test_unknown_hash_marker_before_detailed_info_breaks_loop(self, mock_rows):
"""DescribeTableExec emits (``# Metadata Columns``) appears before
``# Detailed Table Information`` with empty col_type. Without the fix
the loop reaches ``re.search`` on None and raises TypeError. With the
fix the loop breaks at the marker."""
mock_rows.return_value = [
("id", "bigint", None),
("name", "string", None),
("# Metadata Columns", None, None),
("_metadata", "struct<...>", None),
("# Detailed Table Information", None, None),
("Catalog", "my_catalog", None),
("Location", "s3://...", None),
]

result = self._run()

assert [col["name"] for col in result] == ["id", "name"]
assert [col["ordinal_position"] for col in result] == [0, 1]

def test_empty_col_type_breaks_loop(self, mock_rows):
"""Exact shape of the prod crash row: non-empty col_name, col_type=None."""
mock_rows.return_value = [
("id", "bigint", None),
("weird_marker_row", None, None),
("should_not_appear", "string", None),
]

result = self._run()

assert [col["name"] for col in result] == ["id"]

def test_empty_string_col_type_breaks_loop(self, mock_rows):
"""``_get_column_rows`` normalizes empty strings to None, but guard
defensively against either shape reaching the loop."""
mock_rows.return_value = [
("id", "bigint", None),
("weird_marker_row", "", None),
]

result = self._run()

assert [col["name"] for col in result] == ["id"]

def test_known_whitelisted_markers_still_break(self, mock_rows):
"""Regression: previously whitelisted markers continue to terminate the
columns block."""
for marker in (
"# Partition Information",
"# Partitioning",
"# Clustering Information",
"# Delta Statistics Columns",
"# Detailed Table Information",
"# Delta Uniform Iceberg",
):
mock_rows.return_value = [
("id", "int", None),
(marker, None, None),
("must_not_leak", "string", None),
]

result = self._run()

assert [col["name"] for col in result] == [
"id"
], f"marker {marker!r} should break the loop"

def test_detailed_info_metadata_rows_are_not_treated_as_columns(self, mock_rows):
"""Post-break, rows like ``Name``, ``Catalog``, ``Location`` inside
``# Detailed Table Information`` (which have non-empty col_type — a
path, catalog name, etc.) must not be emitted as fake columns."""
mock_rows.return_value = [
("id", "bigint", None),
("# Detailed Table Information", None, None),
("Name", "my_catalog.my_schema.my_table", None),
("Location", "s3://bucket/path", None),
("Provider", "delta", None),
("Owner", "user@example.com", None),
]

result = self._run()

assert [col["name"] for col in result] == ["id"]

def test_ordinal_positions_are_contiguous_when_loop_breaks_early(self, mock_rows):
"""Ordinal positions stay contiguous and match the column order in
Databricks when the loop breaks at a section marker."""
mock_rows.return_value = [
("first", "bigint", None),
("second", "string", None),
("third", "int", None),
("# Metadata Columns", None, None),
("_metadata", "struct<...>", None),
]

result = self._run()

assert [col["ordinal_position"] for col in result] == [0, 1, 2]

def test_ordinal_positions_contiguous_when_a_column_is_skipped(self, mock_rows):
"""If a row fails per-column processing (e.g. unparseable col_type),
surviving columns keep contiguous ordinal positions — no gaps."""
mock_rows.return_value = [
("good1", "bigint", None),
("unparseable", "<weird>", None),
("good2", "string", None),
("good3", "int", None),
]

result = self._run()

assert [col["name"] for col in result] == ["good1", "good2", "good3"]
assert [col["ordinal_position"] for col in result] == [0, 1, 2]

def test_unexpected_exception_in_row_does_not_drop_other_columns(self, mock_rows):
"""Broad per-row try/except ensures one bad column doesn't lose the
rest of the table's columns via the outer sql_column_handler catch.

A ``struct<>`` column triggers the complex-type subquery path. Forcing
``connection.execute`` to raise a ``RuntimeError`` (not caught by the
inner ``(DatabaseError, KeyError)`` handler) bubbles to the outer
``except Exception``, which should skip the bad column and continue
processing subsequent rows."""
mock_rows.return_value = [
("good1", "bigint", None),
("complex_col", "struct<a:int>", None),
("good2", "string", None),
]
Comment on lines +171 to +184
Copy link

Copilot AI Apr 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

test_unexpected_exception_in_row_does_not_drop_other_columns doesn't currently trigger an exception inside get_columns (unknown types are handled via KeyError -> NullType). As written, it won't fail if the new broad per-row exception handling is removed. Adjust the test to force an actual exception in per-row processing (e.g., patch _type_map lookup or re.search/complex-type subquery to raise) and assert that subsequent columns are still returned.

Copilot uses AI. Check for mistakes.
self.mock_connection.execute.side_effect = RuntimeError(
"simulated subquery failure"
)

result = self._run()

names = [col["name"] for col in result]
assert "good1" in names
assert "good2" in names
assert "complex_col" not in names

Comment on lines +172 to +195
Copy link

Copilot AI Apr 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

test_unexpected_exception_in_row_does_not_drop_other_columns doesn't currently simulate an exception in the per-row processing path: an unknown col_type just maps to NullType and still appends, so this test will pass even without the new broad per-row try/except. To actually cover the intended regression, force an exception inside the loop (e.g., patch re.search/_type_map.__getitem__ to raise for one row) and assert the remaining columns are still returned.

Copilot generated this review using guidance from repository custom instructions.

if __name__ == "__main__":
pytest.main([__file__, "-v"])
Loading