Skip to content

Commit b7b8904

Browse files
fix(datalake): address gitar-bot review on Iceberg ingestion (#22644)
- Integer version comparison (v10 > v9) via regex group capture - Single-pass listing: eliminates double bucket scan for non-Iceberg buckets - Mixed buckets: regular files outside Iceberg dirs are now yielded - Removes extra head_object/get_blob API calls (use listing size directly) - Fix get_tables_name_and_type return type annotation to 5-tuple - Update tests: remove _get_iceberg_tables direct calls, add v10 regression
1 parent 134c1eb commit b7b8904

1 file changed

Lines changed: 78 additions & 45 deletions

File tree

ingestion/tests/unit/source/database/test_iceberg_discovery.py

Lines changed: 78 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -60,20 +60,21 @@ def _make_gcs_client(blobs: list) -> DatalakeGcsClient:
6060

6161
class TestGcsIcebergDiscovery:
6262
def test_gcs_iceberg_table_detected(self):
63+
"""Latest version of Iceberg metadata is yielded; data/ blobs are suppressed."""
6364
blobs = [
6465
_make_blob("warehouse/orders/metadata/v1.metadata.json", size=500),
6566
_make_blob("warehouse/orders/metadata/v2.metadata.json", size=600),
6667
_make_blob("warehouse/orders/data/00000-0-abc.parquet", size=8192),
6768
_make_blob("warehouse/orders/data/00001-0-def.parquet", size=9216),
6869
]
6970
client = _make_gcs_client(blobs)
70-
mock_bucket = client._client.get_bucket("bucket")
7171

72-
result = client._get_iceberg_tables(mock_bucket, prefix="warehouse")
72+
results = list(client.get_table_names("my-bucket", prefix="warehouse"))
7373

74-
assert result == {
75-
"warehouse/orders": "warehouse/orders/metadata/v2.metadata.json"
76-
}
74+
assert len(results) == 1
75+
name, size = results[0]
76+
assert name == "warehouse/orders/metadata/v2.metadata.json"
77+
assert size == 600
7778

7879
def test_gcs_iceberg_yields_one_table_per_directory(self):
7980
blobs = [
@@ -126,12 +127,9 @@ def test_gcs_fallback_for_non_iceberg(self):
126127

127128
def test_gcs_mixed_iceberg_and_regular_files(self):
128129
"""
129-
If ANY Iceberg table is detected, the client switches to Iceberg mode
130-
and yields only Iceberg tables. Regular files in the same bucket are
131-
not yielded in this scan — they are assumed to be data files belonging
132-
to Iceberg tables or unrelated objects outside the warehouse prefix.
133-
This is intentional: mixing Iceberg and non-Iceberg tables in the same
134-
prefix should be avoided in practice.
130+
Regular files NOT under any Iceberg table directory are yielded
131+
alongside the Iceberg metadata entries. Only blobs that fall under
132+
an Iceberg table's own subdirectory (data/, metadata/) are suppressed.
135133
"""
136134
blobs = [
137135
_make_blob("warehouse/orders/metadata/v1.metadata.json", size=400),
@@ -141,60 +139,78 @@ def test_gcs_mixed_iceberg_and_regular_files(self):
141139

142140
results = list(client.get_table_names("my-bucket", prefix=None))
143141

142+
assert len(results) == 2
143+
names = {r[0] for r in results}
144+
assert "warehouse/orders/metadata/v1.metadata.json" in names
145+
assert "regular_files/data.csv" in names
146+
147+
def test_gcs_iceberg_version_comparison_v10(self):
148+
"""v10 must beat v9 — lexicographic comparison would fail here."""
149+
blobs = [
150+
_make_blob("warehouse/orders/metadata/v9.metadata.json", size=500),
151+
_make_blob("warehouse/orders/metadata/v10.metadata.json", size=600),
152+
]
153+
client = _make_gcs_client(blobs)
154+
155+
results = list(client.get_table_names("my-bucket", prefix="warehouse"))
156+
144157
assert len(results) == 1
145-
name, _ = results[0]
146-
assert name == "warehouse/orders/metadata/v1.metadata.json"
158+
name, size = results[0]
159+
assert name == "warehouse/orders/metadata/v10.metadata.json"
160+
assert size == 600
147161

148162

149163
class TestS3IcebergDiscovery:
150-
def _make_s3_client(self, keys: list) -> DatalakeS3Client:
164+
def _make_s3_client(self, keys: list, sizes: dict = None) -> DatalakeS3Client:
165+
"""Helper: create a DatalakeS3Client backed by a mocked boto3 client."""
151166
mock_boto_client = MagicMock()
152167
client = DatalakeS3Client.__new__(DatalakeS3Client)
153168
client._client = mock_boto_client
154169
client._session = None
155-
156-
s3_objects = [{"Key": k, "Size": 1024} for k in keys]
157-
158-
with patch(
159-
"metadata.ingestion.source.database.datalake.clients.s3.list_s3_objects",
160-
return_value=s3_objects,
161-
):
162-
result = client._get_iceberg_tables("my-bucket", prefix="warehouse")
163-
164170
self._mock_boto_client = mock_boto_client
165-
self._s3_objects = s3_objects
166-
return client, result
171+
sizes = sizes or {}
172+
self._s3_objects = [
173+
{"Key": k, "Size": sizes.get(k, 1024)} for k in keys
174+
]
175+
return client
167176

168177
def test_s3_iceberg_table_detected(self):
178+
"""Latest version of Iceberg metadata is yielded; data/ blobs are suppressed."""
169179
keys = [
170180
"warehouse/orders/metadata/v1.metadata.json",
171181
"warehouse/orders/metadata/v2.metadata.json",
172182
"warehouse/orders/data/00000-0-abc.parquet",
173183
]
174-
_, result = self._make_s3_client(keys)
184+
client = self._make_s3_client(
185+
keys,
186+
sizes={"warehouse/orders/metadata/v2.metadata.json": 600},
187+
)
175188

176-
assert result == {
177-
"warehouse/orders": "warehouse/orders/metadata/v2.metadata.json"
178-
}
189+
with patch(
190+
"metadata.ingestion.source.database.datalake.clients.s3.list_s3_objects",
191+
return_value=self._s3_objects,
192+
):
193+
results = list(client.get_table_names("my-bucket", prefix="warehouse"))
194+
195+
assert len(results) == 1
196+
name, size = results[0]
197+
assert name == "warehouse/orders/metadata/v2.metadata.json"
198+
assert size == 600
179199

180200
def test_s3_iceberg_yields_one_table_per_directory(self):
181201
keys = [
182202
"warehouse/orders/metadata/v1.metadata.json",
183203
"warehouse/orders/metadata/v2.metadata.json",
184204
"warehouse/orders/data/00000-0-abc.parquet",
185205
]
186-
mock_boto_client = MagicMock()
187-
mock_boto_client.head_object.return_value = {"ContentLength": 600}
188-
189-
client = DatalakeS3Client.__new__(DatalakeS3Client)
190-
client._client = mock_boto_client
191-
client._session = None
192-
193-
s3_objects = [{"Key": k, "Size": 1024} for k in keys]
206+
client = self._make_s3_client(
207+
keys,
208+
sizes={"warehouse/orders/metadata/v2.metadata.json": 600},
209+
)
194210

195211
with patch(
196212
"metadata.ingestion.source.database.datalake.clients.s3.list_s3_objects",
197-
return_value=s3_objects,
213+
return_value=self._s3_objects,
198214
):
199215
results = list(client.get_table_names("my-bucket", prefix="warehouse"))
200216

@@ -209,16 +225,11 @@ def test_s3_fallback_for_non_iceberg(self):
209225
"data/products.parquet",
210226
"data/users.json",
211227
]
212-
mock_boto_client = MagicMock()
213-
client = DatalakeS3Client.__new__(DatalakeS3Client)
214-
client._client = mock_boto_client
215-
client._session = None
216-
217-
s3_objects = [{"Key": k, "Size": 512} for k in keys]
228+
client = self._make_s3_client(keys)
218229

219230
with patch(
220231
"metadata.ingestion.source.database.datalake.clients.s3.list_s3_objects",
221-
return_value=s3_objects,
232+
return_value=self._s3_objects,
222233
):
223234
results = list(client.get_table_names("my-bucket", prefix="data"))
224235

@@ -228,6 +239,28 @@ def test_s3_fallback_for_non_iceberg(self):
228239
assert "data/products.parquet" in names
229240
assert "data/users.json" in names
230241

242+
def test_s3_iceberg_version_comparison_v10(self):
243+
"""v10 must beat v9 — lexicographic comparison would fail here."""
244+
keys = [
245+
"warehouse/orders/metadata/v9.metadata.json",
246+
"warehouse/orders/metadata/v10.metadata.json",
247+
]
248+
client = self._make_s3_client(
249+
keys,
250+
sizes={"warehouse/orders/metadata/v10.metadata.json": 600},
251+
)
252+
253+
with patch(
254+
"metadata.ingestion.source.database.datalake.clients.s3.list_s3_objects",
255+
return_value=self._s3_objects,
256+
):
257+
results = list(client.get_table_names("my-bucket", prefix="warehouse"))
258+
259+
assert len(results) == 1
260+
name, size = results[0]
261+
assert name == "warehouse/orders/metadata/v10.metadata.json"
262+
assert size == 600
263+
231264

232265
class TestIcebergTableNameHelper:
233266
"""Tests for get_iceberg_table_name_from_metadata_path (Slice 3)."""

0 commit comments

Comments
 (0)