Skip to content

Commit 1338bbc

Browse files
committed
fix: add iceberg_type column for SqlCatalog
1 parent c6a6c06 commit 1338bbc

2 files changed

Lines changed: 94 additions & 1 deletion

File tree

pyiceberg/catalog/sql.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
delete,
2828
insert,
2929
select,
30+
text,
3031
union,
3132
update,
3233
)
@@ -92,6 +93,7 @@ class IcebergTables(SqlCatalogBaseTable):
9293
table_name: Mapped[str] = mapped_column(String(255), nullable=False, primary_key=True)
9394
metadata_location: Mapped[str | None] = mapped_column(String(1000), nullable=True)
9495
previous_metadata_location: Mapped[str | None] = mapped_column(String(1000), nullable=True)
96+
iceberg_type: Mapped[str | None] = mapped_column(String(5), nullable=True, default="TABLE")
9597

9698

9799
class IcebergNamespaceProperties(SqlCatalogBaseTable):
@@ -133,6 +135,8 @@ def __init__(self, name: str, **properties: str):
133135
if init_catalog_tables:
134136
self._ensure_tables_exist()
135137

138+
self._update_tables_if_required()
139+
136140
def _ensure_tables_exist(self) -> None:
137141
with Session(self.engine) as session:
138142
for table in [IcebergTables, IcebergNamespaceProperties]:
@@ -146,6 +150,15 @@ def _ensure_tables_exist(self) -> None:
146150
self.create_tables()
147151
return
148152

153+
def _update_tables_if_required(self) -> None:
154+
with Session(self.engine) as session:
155+
stmt = f"ALTER TABLE {IcebergTables.__tablename__} ADD COLUMN iceberg_type VARCHAR(5)"
156+
try:
157+
session.execute(text(stmt))
158+
session.commit()
159+
except (OperationalError, ProgrammingError):
160+
session.rollback()
161+
149162
def create_tables(self) -> None:
150163
SqlCatalogBaseTable.metadata.create_all(self.engine)
151164

tests/catalog/test_sql.py

Lines changed: 81 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
from typing import cast
2121

2222
import pytest
23-
from sqlalchemy import Engine, create_engine, inspect
23+
from sqlalchemy import Engine, create_engine, inspect, text
2424
from sqlalchemy.exc import ArgumentError
2525

2626
from pyiceberg.catalog import load_catalog
@@ -261,3 +261,83 @@ def test_sql_catalog_multiple_close_calls(self, catalog_sqlite: SqlCatalog) -> N
261261

262262
# Second close should not raise an exception
263263
catalog_sqlite.close()
264+
265+
266+
def _create_pre_migration_schema_tables(engine: Engine) -> None:
267+
with engine.connect() as conn:
268+
conn.execute(
269+
text(
270+
"CREATE TABLE iceberg_tables ("
271+
" catalog_name VARCHAR(255) NOT NULL,"
272+
" table_namespace VARCHAR(255) NOT NULL,"
273+
" table_name VARCHAR(255) NOT NULL,"
274+
" metadata_location VARCHAR(1000),"
275+
" previous_metadata_location VARCHAR(1000),"
276+
" PRIMARY KEY (catalog_name, table_namespace, table_name)"
277+
")"
278+
)
279+
)
280+
conn.execute(
281+
text(
282+
"CREATE TABLE iceberg_namespace_properties ("
283+
" catalog_name VARCHAR(255) NOT NULL,"
284+
" namespace VARCHAR(255) NOT NULL,"
285+
" property_key VARCHAR(255) NOT NULL,"
286+
" property_value VARCHAR(1000) NOT NULL,"
287+
" PRIMARY KEY (catalog_name, namespace, property_key)"
288+
")"
289+
)
290+
)
291+
conn.commit()
292+
293+
294+
def get_columns(engine: Engine) -> set[str]:
295+
return {c["name"] for c in inspect(engine).get_columns("iceberg_tables")}
296+
297+
298+
def test_adds_iceberg_type_column_to_old_schema(warehouse: Path) -> None:
299+
# Create the old schema tables
300+
uri = f"sqlite:////{warehouse}/test-migration-add-col"
301+
engine = create_engine(uri)
302+
with engine.connect() as conn:
303+
conn.execute(
304+
text(
305+
"CREATE TABLE iceberg_tables ("
306+
" catalog_name VARCHAR(255) NOT NULL,"
307+
" table_namespace VARCHAR(255) NOT NULL,"
308+
" table_name VARCHAR(255) NOT NULL,"
309+
" metadata_location VARCHAR(1000),"
310+
" previous_metadata_location VARCHAR(1000),"
311+
" PRIMARY KEY (catalog_name, table_namespace, table_name)"
312+
")"
313+
)
314+
)
315+
conn.execute(
316+
text(
317+
"CREATE TABLE iceberg_namespace_properties ("
318+
" catalog_name VARCHAR(255) NOT NULL,"
319+
" namespace VARCHAR(255) NOT NULL,"
320+
" property_key VARCHAR(255) NOT NULL,"
321+
" property_value VARCHAR(1000) NOT NULL,"
322+
" PRIMARY KEY (catalog_name, namespace, property_key)"
323+
")"
324+
)
325+
)
326+
conn.commit()
327+
328+
# Verify the column does not exist in the old schema
329+
assert "iceberg_type" not in get_columns(engine)
330+
331+
# Load the catalog and verify the column exists
332+
catalog = SqlCatalog("test", uri=uri, warehouse=f"file://{warehouse}", init_catalog_tables="false")
333+
assert "iceberg_type" in get_columns(catalog.engine)
334+
335+
336+
def test_idempotent_when_column_already_exists(warehouse: Path) -> None:
337+
# Verify the column was created by the init_tables call
338+
catalog = SqlCatalog("test", uri="sqlite:///:memory:", warehouse=f"file://{warehouse}")
339+
assert "iceberg_type" in get_columns(catalog.engine)
340+
341+
# Verify the method is idempotent by calling it again
342+
catalog._update_tables_if_required()
343+
assert "iceberg_type" in get_columns(catalog.engine)

0 commit comments

Comments
 (0)