Skip to content

Commit 0edfac7

Browse files
committed
Fix TypeError in Databricks get_columns on non-whitelisted DESCRIBE markers
DESCRIBE TABLE EXTENDED emits '#'-prefixed section markers beyond the hardcoded whitelist (e.g. '# Metadata Columns' from Spark v2 DescribeTableExec), causing re.search on a None col_type to raise TypeError and sql_column_handler to drop all columns for the table. Treat any '#'-prefixed row, empty col_type, or col_type with no leading word chars as end-of-columns and break. Add a per-row try/except so unexpected failures skip one column instead of the whole table. Use len(result) for ordinal_position to keep values contiguous.
1 parent a1f5de7 commit 0edfac7

2 files changed

Lines changed: 261 additions & 46 deletions

File tree

ingestion/src/metadata/ingestion/source/database/databricks/metadata.py

Lines changed: 63 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -182,55 +182,72 @@ def get_columns(self, connection, table_name, schema=None, **kw):
182182

183183
rows = _get_column_rows(self, connection, table_name, schema, kw.get("db_name"))
184184
result = []
185-
for ordinal_position, (col_name, col_type, _comment) in enumerate(rows):
186-
# Handle both oss hive and Databricks' hive partition header, respectively
187-
if col_name in (
188-
"# Partition Information",
189-
"# Partitioning",
190-
"# Clustering Information",
191-
"# Delta Statistics Columns",
192-
"# Detailed Table Information",
193-
"# Delta Uniform Iceberg",
194-
):
185+
for col_name, col_type, _comment in rows:
186+
# DESCRIBE TABLE EXTENDED emits real columns first, then '#'-prefixed
187+
# section markers (e.g. '# Partition Information', '# Metadata Columns',
188+
# '# Detailed Table Information', '# Constraints'). Spark's v2
189+
# DescribeTableExec can emit markers not in any hardcoded whitelist, so
190+
# treat any '#'-prefixed row or row with empty col_type as end-of-columns.
191+
# ('# col_name' sub-header is filtered upstream in _get_column_rows.)
192+
if col_name.startswith("#") or not col_type:
193+
logger.debug(
194+
f"End of columns for {schema}.{table_name}. Found end-of-columns marker: {col_name}. Stopping column extraction."
195+
)
195196
break
196-
# Take out the more detailed type information
197-
# e.g. 'map<ixnt,int>' -> 'map'
198-
# 'decimal(10,1)' -> decimal
199-
raw_col_type = col_type
200-
col_type = re.search(r"^\w+", col_type).group(0)
201197
try:
202-
coltype = _type_map[col_type]
203-
except KeyError:
204-
util.warn(f"Did not recognize type '{col_type}' of column '{col_name}'")
205-
coltype = types.NullType
206-
207-
col_info = {
208-
"name": col_name,
209-
"type": coltype,
210-
"nullable": True,
211-
"default": None,
212-
"comment": _comment,
213-
"system_data_type": raw_col_type,
214-
"ordinal_position": ordinal_position,
215-
}
216-
if col_type in {"array", "struct", "map"}:
217-
try:
218-
rows = {
219-
r[0]: r[1]
220-
for r in connection.execute(
221-
text(
222-
f"DESCRIBE TABLE `{kw.get('db_name')}`.`{schema}`.`{table_name}` `{col_name}`"
223-
)
224-
).fetchall()
225-
}
226-
col_info["system_data_type"] = rows["data_type"]
227-
col_info["is_complex"] = True
228-
except DatabaseError as err:
229-
logger.error(
230-
f"Failed to fetch column details for column {col_name} in table {table_name} due to: {err}"
198+
# Take out the more detailed type information
199+
# e.g. 'map<int,int>' -> 'map', 'decimal(10,1)' -> 'decimal'
200+
raw_col_type = col_type
201+
type_match = re.search(r"^\w+", col_type)
202+
if type_match is None:
203+
logger.warning(
204+
f"Skipping column '{col_name}' in {schema}.{table_name}: "
205+
f"unparseable col_type '{col_type}'"
231206
)
232-
logger.debug(traceback.format_exc())
233-
result.append(col_info)
207+
continue
208+
col_type = type_match.group(0)
209+
210+
try:
211+
coltype = _type_map[col_type]
212+
except KeyError:
213+
util.warn(f"Did not recognize type '{col_type}' of column '{col_name}'")
214+
coltype = types.NullType
215+
216+
col_info = {
217+
"name": col_name,
218+
"type": coltype,
219+
"nullable": True,
220+
"default": None,
221+
"comment": _comment,
222+
"system_data_type": raw_col_type,
223+
"ordinal_position": len(result),
224+
}
225+
if col_type in {"array", "struct", "map"}:
226+
try:
227+
sub_rows = {
228+
r[0]: r[1]
229+
for r in connection.execute(
230+
text(
231+
f"DESCRIBE TABLE `{kw.get('db_name')}`.`{schema}`"
232+
f".`{table_name}` `{col_name}`"
233+
)
234+
).fetchall()
235+
}
236+
col_info["system_data_type"] = sub_rows["data_type"]
237+
col_info["is_complex"] = True
238+
except (DatabaseError, KeyError) as err:
239+
logger.error(
240+
f"Failed to fetch complex-type details for column "
241+
f"{col_name} in table {table_name}: {err}"
242+
)
243+
logger.debug(traceback.format_exc())
244+
result.append(col_info)
245+
except Exception as err: # pylint: disable=broad-except
246+
logger.warning(
247+
f"Skipping column '{col_name}' in {schema}.{table_name} due to "
248+
f"unexpected error: {err}"
249+
)
250+
logger.debug(traceback.format_exc())
234251
return result
235252

236253

Lines changed: 198 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,198 @@
1+
# Copyright 2026 Collate
2+
# Licensed under the Collate Community License, Version 1.0 (the "License");
3+
# you may not use this file except in compliance with the License.
4+
# You may obtain a copy of the License at
5+
# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE
6+
# Unless required by applicable law or agreed to in writing, software
7+
# distributed under the License is distributed on an "AS IS" BASIS,
8+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
9+
# See the License for the specific language governing permissions and
10+
# limitations under the License.
11+
12+
"""
13+
Regression tests for the databricks `get_columns` override.
14+
15+
Incident: DESCRIBE TABLE EXTENDED on Unity Catalog / DSv2 tables (streaming,
16+
Iceberg, foreign tables, and others handled by Spark's ``DescribeTableExec``)
17+
emits section markers not present in the connector's historical whitelist —
18+
notably ``# Metadata Columns`` which appears BEFORE ``# Detailed Table
19+
Information``. Upstream ``_get_column_rows`` normalizes the empty col_type
20+
cell on those marker rows to ``None`` and the column-name-only filter lets
21+
them survive. ``get_columns`` then called ``re.search(r"^\\w+", col_type)``
22+
on ``None`` and raised ``TypeError: expected string or bytes-like object``.
23+
``sql_column_handler`` swallowed the exception and returned zero columns,
24+
which the topology runner treated as "no change" — silently dropping column
25+
metadata for every affected table.
26+
27+
The fix generalizes the loop's end-of-columns detection: any row whose
28+
col_name starts with ``#`` or whose col_type is empty terminates the columns
29+
block. This matches Spark's emission order for both v1 (``DescribeTableCommand``)
30+
and v2 (``DescribeTableExec``) paths without hardcoding a marker list.
31+
"""
32+
33+
from unittest.mock import Mock, patch
34+
35+
import pytest
36+
37+
from metadata.ingestion.source.database.databricks.metadata import get_columns
38+
39+
40+
@patch("metadata.ingestion.source.database.databricks.metadata._get_column_rows")
41+
class TestDatabricksGetColumnsSectionBoundary:
42+
"""End-of-columns detection: generic '#'-prefix and empty col_type break."""
43+
44+
def setup_method(self):
45+
self.mock_self = Mock()
46+
self.mock_connection = Mock()
47+
48+
def _run(self):
49+
return get_columns(
50+
self.mock_self,
51+
self.mock_connection,
52+
"t",
53+
"s",
54+
db_name="db",
55+
)
56+
57+
def test_unknown_hash_marker_before_detailed_info_breaks_loop(self, mock_rows):
58+
"""DescribeTableExec emits (``# Metadata Columns``) appears before
59+
``# Detailed Table Information`` with empty col_type. Without the fix
60+
the loop reaches ``re.search`` on None and raises TypeError. With the
61+
fix the loop breaks at the marker."""
62+
mock_rows.return_value = [
63+
("id", "bigint", None),
64+
("name", "string", None),
65+
("# Metadata Columns", None, None),
66+
("_metadata", "struct<...>", None),
67+
("# Detailed Table Information", None, None),
68+
("Catalog", "my_catalog", None),
69+
("Location", "s3://...", None),
70+
]
71+
72+
result = self._run()
73+
74+
assert [col["name"] for col in result] == ["id", "name"]
75+
assert [col["ordinal_position"] for col in result] == [0, 1]
76+
77+
def test_empty_col_type_breaks_loop(self, mock_rows):
78+
"""Exact shape of the prod crash row: non-empty col_name, col_type=None."""
79+
mock_rows.return_value = [
80+
("id", "bigint", None),
81+
("weird_marker_row", None, None),
82+
("should_not_appear", "string", None),
83+
]
84+
85+
result = self._run()
86+
87+
assert [col["name"] for col in result] == ["id"]
88+
89+
def test_empty_string_col_type_breaks_loop(self, mock_rows):
90+
"""``_get_column_rows`` normalizes empty strings to None, but guard
91+
defensively against either shape reaching the loop."""
92+
mock_rows.return_value = [
93+
("id", "bigint", None),
94+
("weird_marker_row", "", None),
95+
]
96+
97+
result = self._run()
98+
99+
assert [col["name"] for col in result] == ["id"]
100+
101+
def test_known_whitelisted_markers_still_break(self, mock_rows):
102+
"""Regression: previously whitelisted markers continue to terminate the
103+
columns block."""
104+
for marker in (
105+
"# Partition Information",
106+
"# Partitioning",
107+
"# Clustering Information",
108+
"# Delta Statistics Columns",
109+
"# Detailed Table Information",
110+
"# Delta Uniform Iceberg",
111+
):
112+
mock_rows.return_value = [
113+
("id", "int", None),
114+
(marker, None, None),
115+
("must_not_leak", "string", None),
116+
]
117+
118+
result = self._run()
119+
120+
assert [col["name"] for col in result] == [
121+
"id"
122+
], f"marker {marker!r} should break the loop"
123+
124+
def test_detailed_info_metadata_rows_are_not_treated_as_columns(self, mock_rows):
125+
"""Post-break, rows like ``Name``, ``Catalog``, ``Location`` inside
126+
``# Detailed Table Information`` (which have non-empty col_type — a
127+
path, catalog name, etc.) must not be emitted as fake columns."""
128+
mock_rows.return_value = [
129+
("id", "bigint", None),
130+
("# Detailed Table Information", None, None),
131+
("Name", "my_catalog.my_schema.my_table", None),
132+
("Location", "s3://bucket/path", None),
133+
("Provider", "delta", None),
134+
("Owner", "user@example.com", None),
135+
]
136+
137+
result = self._run()
138+
139+
assert [col["name"] for col in result] == ["id"]
140+
141+
def test_ordinal_positions_are_contiguous_when_loop_breaks_early(self, mock_rows):
142+
"""Ordinal positions stay contiguous and match the column order in
143+
Databricks when the loop breaks at a section marker."""
144+
mock_rows.return_value = [
145+
("first", "bigint", None),
146+
("second", "string", None),
147+
("third", "int", None),
148+
("# Metadata Columns", None, None),
149+
("_metadata", "struct<...>", None),
150+
]
151+
152+
result = self._run()
153+
154+
assert [col["ordinal_position"] for col in result] == [0, 1, 2]
155+
156+
def test_ordinal_positions_contiguous_when_a_column_is_skipped(self, mock_rows):
157+
"""If a row fails per-column processing (e.g. unparseable col_type),
158+
surviving columns keep contiguous ordinal positions — no gaps."""
159+
mock_rows.return_value = [
160+
("good1", "bigint", None),
161+
("unparseable", "<weird>", None),
162+
("good2", "string", None),
163+
("good3", "int", None),
164+
]
165+
166+
result = self._run()
167+
168+
assert [col["name"] for col in result] == ["good1", "good2", "good3"]
169+
assert [col["ordinal_position"] for col in result] == [0, 1, 2]
170+
171+
def test_unexpected_exception_in_row_does_not_drop_other_columns(self, mock_rows):
172+
"""Broad per-row try/except ensures one bad column doesn't lose the
173+
rest of the table's columns via the outer sql_column_handler catch.
174+
175+
A ``struct<>`` column triggers the complex-type subquery path. Forcing
176+
``connection.execute`` to raise a ``RuntimeError`` (not caught by the
177+
inner ``(DatabaseError, KeyError)`` handler) bubbles to the outer
178+
``except Exception``, which should skip the bad column and continue
179+
processing subsequent rows."""
180+
mock_rows.return_value = [
181+
("good1", "bigint", None),
182+
("complex_col", "struct<a:int>", None),
183+
("good2", "string", None),
184+
]
185+
self.mock_connection.execute.side_effect = RuntimeError(
186+
"simulated subquery failure"
187+
)
188+
189+
result = self._run()
190+
191+
names = [col["name"] for col in result]
192+
assert "good1" in names
193+
assert "good2" in names
194+
assert "complex_col" not in names
195+
196+
197+
if __name__ == "__main__":
198+
pytest.main([__file__, "-v"])

0 commit comments

Comments
 (0)