From 0edfac751ee772228b2cc33ba63083f55a1b5737 Mon Sep 17 00:00:00 2001 From: Pablo Takara Date: Wed, 22 Apr 2026 14:21:08 +0200 Subject: [PATCH] Fix TypeError in Databricks get_columns on non-whitelisted DESCRIBE markers DESCRIBE TABLE EXTENDED emits '#'-prefixed section markers beyond the hardcoded whitelist (e.g. '# Metadata Columns' from Spark v2 DescribeTableExec), causing re.search on a None col_type to raise TypeError and sql_column_handler to drop all columns for the table. Treat any '#'-prefixed row, empty col_type, or col_type with no leading word chars as end-of-columns and break. Add a per-row try/except so unexpected failures skip one column instead of the whole table. Use len(result) for ordinal_position to keep values contiguous. --- .../source/database/databricks/metadata.py | 109 ++++++---- .../database/test_databricks_get_columns.py | 198 ++++++++++++++++++ 2 files changed, 261 insertions(+), 46 deletions(-) create mode 100644 ingestion/tests/unit/topology/database/test_databricks_get_columns.py diff --git a/ingestion/src/metadata/ingestion/source/database/databricks/metadata.py b/ingestion/src/metadata/ingestion/source/database/databricks/metadata.py index 6dfb59a77b5f..4a6f3cbe5dd6 100644 --- a/ingestion/src/metadata/ingestion/source/database/databricks/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/databricks/metadata.py @@ -182,55 +182,72 @@ def get_columns(self, connection, table_name, schema=None, **kw): rows = _get_column_rows(self, connection, table_name, schema, kw.get("db_name")) result = [] - for ordinal_position, (col_name, col_type, _comment) in enumerate(rows): - # Handle both oss hive and Databricks' hive partition header, respectively - if col_name in ( - "# Partition Information", - "# Partitioning", - "# Clustering Information", - "# Delta Statistics Columns", - "# Detailed Table Information", - "# Delta Uniform Iceberg", - ): + for col_name, col_type, _comment in rows: + # DESCRIBE TABLE EXTENDED emits real columns first, then '#'-prefixed + # section markers (e.g. '# Partition Information', '# Metadata Columns', + # '# Detailed Table Information', '# Constraints'). Spark's v2 + # DescribeTableExec can emit markers not in any hardcoded whitelist, so + # treat any '#'-prefixed row or row with empty col_type as end-of-columns. + # ('# col_name' sub-header is filtered upstream in _get_column_rows.) + if col_name.startswith("#") or not col_type: + logger.debug( + f"End of columns for {schema}.{table_name}. Found end-of-columns marker: {col_name}. Stopping column extraction." + ) break - # Take out the more detailed type information - # e.g. 'map' -> 'map' - # 'decimal(10,1)' -> decimal - raw_col_type = col_type - col_type = re.search(r"^\w+", col_type).group(0) try: - coltype = _type_map[col_type] - except KeyError: - util.warn(f"Did not recognize type '{col_type}' of column '{col_name}'") - coltype = types.NullType - - col_info = { - "name": col_name, - "type": coltype, - "nullable": True, - "default": None, - "comment": _comment, - "system_data_type": raw_col_type, - "ordinal_position": ordinal_position, - } - if col_type in {"array", "struct", "map"}: - try: - rows = { - r[0]: r[1] - for r in connection.execute( - text( - f"DESCRIBE TABLE `{kw.get('db_name')}`.`{schema}`.`{table_name}` `{col_name}`" - ) - ).fetchall() - } - col_info["system_data_type"] = rows["data_type"] - col_info["is_complex"] = True - except DatabaseError as err: - logger.error( - f"Failed to fetch column details for column {col_name} in table {table_name} due to: {err}" + # Take out the more detailed type information + # e.g. 'map' -> 'map', 'decimal(10,1)' -> 'decimal' + raw_col_type = col_type + type_match = re.search(r"^\w+", col_type) + if type_match is None: + logger.warning( + f"Skipping column '{col_name}' in {schema}.{table_name}: " + f"unparseable col_type '{col_type}'" ) - logger.debug(traceback.format_exc()) - result.append(col_info) + continue + col_type = type_match.group(0) + + try: + coltype = _type_map[col_type] + except KeyError: + util.warn(f"Did not recognize type '{col_type}' of column '{col_name}'") + coltype = types.NullType + + col_info = { + "name": col_name, + "type": coltype, + "nullable": True, + "default": None, + "comment": _comment, + "system_data_type": raw_col_type, + "ordinal_position": len(result), + } + if col_type in {"array", "struct", "map"}: + try: + sub_rows = { + r[0]: r[1] + for r in connection.execute( + text( + f"DESCRIBE TABLE `{kw.get('db_name')}`.`{schema}`" + f".`{table_name}` `{col_name}`" + ) + ).fetchall() + } + col_info["system_data_type"] = sub_rows["data_type"] + col_info["is_complex"] = True + except (DatabaseError, KeyError) as err: + logger.error( + f"Failed to fetch complex-type details for column " + f"{col_name} in table {table_name}: {err}" + ) + logger.debug(traceback.format_exc()) + result.append(col_info) + except Exception as err: # pylint: disable=broad-except + logger.warning( + f"Skipping column '{col_name}' in {schema}.{table_name} due to " + f"unexpected error: {err}" + ) + logger.debug(traceback.format_exc()) return result diff --git a/ingestion/tests/unit/topology/database/test_databricks_get_columns.py b/ingestion/tests/unit/topology/database/test_databricks_get_columns.py new file mode 100644 index 000000000000..6933880bdd77 --- /dev/null +++ b/ingestion/tests/unit/topology/database/test_databricks_get_columns.py @@ -0,0 +1,198 @@ +# Copyright 2026 Collate +# Licensed under the Collate Community License, Version 1.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Regression tests for the databricks `get_columns` override. + +Incident: DESCRIBE TABLE EXTENDED on Unity Catalog / DSv2 tables (streaming, +Iceberg, foreign tables, and others handled by Spark's ``DescribeTableExec``) +emits section markers not present in the connector's historical whitelist — +notably ``# Metadata Columns`` which appears BEFORE ``# Detailed Table +Information``. Upstream ``_get_column_rows`` normalizes the empty col_type +cell on those marker rows to ``None`` and the column-name-only filter lets +them survive. ``get_columns`` then called ``re.search(r"^\\w+", col_type)`` +on ``None`` and raised ``TypeError: expected string or bytes-like object``. +``sql_column_handler`` swallowed the exception and returned zero columns, +which the topology runner treated as "no change" — silently dropping column +metadata for every affected table. + +The fix generalizes the loop's end-of-columns detection: any row whose +col_name starts with ``#`` or whose col_type is empty terminates the columns +block. This matches Spark's emission order for both v1 (``DescribeTableCommand``) +and v2 (``DescribeTableExec``) paths without hardcoding a marker list. +""" + +from unittest.mock import Mock, patch + +import pytest + +from metadata.ingestion.source.database.databricks.metadata import get_columns + + +@patch("metadata.ingestion.source.database.databricks.metadata._get_column_rows") +class TestDatabricksGetColumnsSectionBoundary: + """End-of-columns detection: generic '#'-prefix and empty col_type break.""" + + def setup_method(self): + self.mock_self = Mock() + self.mock_connection = Mock() + + def _run(self): + return get_columns( + self.mock_self, + self.mock_connection, + "t", + "s", + db_name="db", + ) + + def test_unknown_hash_marker_before_detailed_info_breaks_loop(self, mock_rows): + """DescribeTableExec emits (``# Metadata Columns``) appears before + ``# Detailed Table Information`` with empty col_type. Without the fix + the loop reaches ``re.search`` on None and raises TypeError. With the + fix the loop breaks at the marker.""" + mock_rows.return_value = [ + ("id", "bigint", None), + ("name", "string", None), + ("# Metadata Columns", None, None), + ("_metadata", "struct<...>", None), + ("# Detailed Table Information", None, None), + ("Catalog", "my_catalog", None), + ("Location", "s3://...", None), + ] + + result = self._run() + + assert [col["name"] for col in result] == ["id", "name"] + assert [col["ordinal_position"] for col in result] == [0, 1] + + def test_empty_col_type_breaks_loop(self, mock_rows): + """Exact shape of the prod crash row: non-empty col_name, col_type=None.""" + mock_rows.return_value = [ + ("id", "bigint", None), + ("weird_marker_row", None, None), + ("should_not_appear", "string", None), + ] + + result = self._run() + + assert [col["name"] for col in result] == ["id"] + + def test_empty_string_col_type_breaks_loop(self, mock_rows): + """``_get_column_rows`` normalizes empty strings to None, but guard + defensively against either shape reaching the loop.""" + mock_rows.return_value = [ + ("id", "bigint", None), + ("weird_marker_row", "", None), + ] + + result = self._run() + + assert [col["name"] for col in result] == ["id"] + + def test_known_whitelisted_markers_still_break(self, mock_rows): + """Regression: previously whitelisted markers continue to terminate the + columns block.""" + for marker in ( + "# Partition Information", + "# Partitioning", + "# Clustering Information", + "# Delta Statistics Columns", + "# Detailed Table Information", + "# Delta Uniform Iceberg", + ): + mock_rows.return_value = [ + ("id", "int", None), + (marker, None, None), + ("must_not_leak", "string", None), + ] + + result = self._run() + + assert [col["name"] for col in result] == [ + "id" + ], f"marker {marker!r} should break the loop" + + def test_detailed_info_metadata_rows_are_not_treated_as_columns(self, mock_rows): + """Post-break, rows like ``Name``, ``Catalog``, ``Location`` inside + ``# Detailed Table Information`` (which have non-empty col_type — a + path, catalog name, etc.) must not be emitted as fake columns.""" + mock_rows.return_value = [ + ("id", "bigint", None), + ("# Detailed Table Information", None, None), + ("Name", "my_catalog.my_schema.my_table", None), + ("Location", "s3://bucket/path", None), + ("Provider", "delta", None), + ("Owner", "user@example.com", None), + ] + + result = self._run() + + assert [col["name"] for col in result] == ["id"] + + def test_ordinal_positions_are_contiguous_when_loop_breaks_early(self, mock_rows): + """Ordinal positions stay contiguous and match the column order in + Databricks when the loop breaks at a section marker.""" + mock_rows.return_value = [ + ("first", "bigint", None), + ("second", "string", None), + ("third", "int", None), + ("# Metadata Columns", None, None), + ("_metadata", "struct<...>", None), + ] + + result = self._run() + + assert [col["ordinal_position"] for col in result] == [0, 1, 2] + + def test_ordinal_positions_contiguous_when_a_column_is_skipped(self, mock_rows): + """If a row fails per-column processing (e.g. unparseable col_type), + surviving columns keep contiguous ordinal positions — no gaps.""" + mock_rows.return_value = [ + ("good1", "bigint", None), + ("unparseable", "", None), + ("good2", "string", None), + ("good3", "int", None), + ] + + result = self._run() + + assert [col["name"] for col in result] == ["good1", "good2", "good3"] + assert [col["ordinal_position"] for col in result] == [0, 1, 2] + + def test_unexpected_exception_in_row_does_not_drop_other_columns(self, mock_rows): + """Broad per-row try/except ensures one bad column doesn't lose the + rest of the table's columns via the outer sql_column_handler catch. + + A ``struct<>`` column triggers the complex-type subquery path. Forcing + ``connection.execute`` to raise a ``RuntimeError`` (not caught by the + inner ``(DatabaseError, KeyError)`` handler) bubbles to the outer + ``except Exception``, which should skip the bad column and continue + processing subsequent rows.""" + mock_rows.return_value = [ + ("good1", "bigint", None), + ("complex_col", "struct", None), + ("good2", "string", None), + ] + self.mock_connection.execute.side_effect = RuntimeError( + "simulated subquery failure" + ) + + result = self._run() + + names = [col["name"] for col in result] + assert "good1" in names + assert "good2" in names + assert "complex_col" not in names + + +if __name__ == "__main__": + pytest.main([__file__, "-v"])