Skip to content

Commit aad8075

Browse files
committed
Merge branch 'main' of github.com:apache/iceberg-python into fd-rust
2 parents 6a7d88a + 1bec9cf commit aad8075

9 files changed

Lines changed: 209 additions & 12 deletions

File tree

mkdocs/docs/contributing.md

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,34 @@ make test-integration-rebuild
155155

156156
To rebuild the containers from scratch.
157157

158+
#### Running Integration Tests against REST Catalogs
159+
160+
!!! warning "Do not run against production catalogs"
161+
The integration tests will delete data throughout the entirety of your catalog. Running these integration tests against production catalogs will result in data loss.
162+
163+
PyIceberg supports the ability to run our catalog tests against an arbitrary REST Catalog.
164+
165+
In order to run the test catalog, you will need to specify which REST catalog to run against with the `PYICEBERG_TEST_CATALOG` environment variable
166+
167+
```sh
168+
export PYICEBERG_TEST_CATALOG=test_catalog
169+
```
170+
171+
The catalog in question can be configured either through the ~/.pyiceberg.yaml file or through environment variables.
172+
173+
```yaml
174+
catalog:
175+
test_catalog:
176+
uri: http://rest-catalog/ws/
177+
credential: t-1234:secret
178+
```
179+
180+
```sh
181+
export PYICEBERG_CATALOG__TEST_CATALOG__URI=thrift://localhost:9083
182+
export PYICEBERG_CATALOG__TEST_CATALOG__ACCESS_KEY_ID=username
183+
export PYICEBERG_CATALOG__TEST_CATALOG__SECRET_ACCESS_KEY=password
184+
```
185+
158186
## Code standards
159187

160188
Below are the formalized conventions that we adhere to in the PyIceberg project. The goal of this is to have a common agreement on how to evolve the codebase, but also using it as guidelines for newcomers to the project.

pyiceberg/io/fsspec.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@
7070
S3_ANONYMOUS,
7171
S3_CONNECT_TIMEOUT,
7272
S3_ENDPOINT,
73+
S3_FORCE_VIRTUAL_ADDRESSING,
7374
S3_PROXY_URI,
7475
S3_REGION,
7576
S3_REQUEST_TIMEOUT,
@@ -168,6 +169,9 @@ def _s3(properties: Properties) -> AbstractFileSystem:
168169
if request_timeout := properties.get(S3_REQUEST_TIMEOUT):
169170
config_kwargs["read_timeout"] = float(request_timeout)
170171

172+
if _force_virtual_addressing := properties.get(S3_FORCE_VIRTUAL_ADDRESSING):
173+
config_kwargs["s3"] = {"addressing_style": "virtual"}
174+
171175
if s3_anonymous := properties.get(S3_ANONYMOUS):
172176
anon = strtobool(s3_anonymous)
173177
else:

pyiceberg/table/__init__.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -569,7 +569,7 @@ def overwrite(
569569
An overwrite may produce zero or more snapshots based on the operation:
570570
571571
- DELETE: In case existing Parquet files can be dropped completely.
572-
- REPLACE: In case existing Parquet files need to be rewritten.
572+
- OVERWRITE: In case existing Parquet files need to be rewritten to drop rows that match the overwrite filter.
573573
- APPEND: In case new data is being inserted into the table.
574574
575575
Args:
@@ -629,7 +629,7 @@ def delete(
629629
A delete may produce zero or more snapshots based on the operation:
630630
631631
- DELETE: In case existing Parquet files can be dropped completely.
632-
- REPLACE: In case existing Parquet files need to be rewritten
632+
- OVERWRITE: In case existing Parquet files need to be rewritten to drop rows that match the delete filter.
633633
634634
Args:
635635
delete_filter: A boolean expression to delete rows from a table
@@ -1396,7 +1396,7 @@ def overwrite(
13961396
An overwrite may produce zero or more snapshots based on the operation:
13971397
13981398
- DELETE: In case existing Parquet files can be dropped completely.
1399-
- REPLACE: In case existing Parquet files need to be rewritten.
1399+
- OVERWRITE: In case existing Parquet files need to be rewritten to drop rows that match the overwrite filter..
14001400
- APPEND: In case new data is being inserted into the table.
14011401
14021402
Args:

pyiceberg/table/update/__init__.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,11 @@ class RemoveStatisticsUpdate(IcebergBaseModel):
193193
snapshot_id: int = Field(alias="snapshot-id")
194194

195195

196+
class RemovePartitionSpecsUpdate(IcebergBaseModel):
197+
action: Literal["remove-partition-specs"] = Field(default="remove-partition-specs")
198+
spec_ids: List[int] = Field(alias="spec-ids")
199+
200+
196201
class RemoveSchemasUpdate(IcebergBaseModel):
197202
action: Literal["remove-schemas"] = Field(default="remove-schemas")
198203
schema_ids: List[int] = Field(alias="schema-ids")
@@ -227,6 +232,7 @@ class RemovePartitionStatisticsUpdate(IcebergBaseModel):
227232
RemovePropertiesUpdate,
228233
SetStatisticsUpdate,
229234
RemoveStatisticsUpdate,
235+
RemovePartitionSpecsUpdate,
230236
RemoveSchemasUpdate,
231237
SetPartitionStatisticsUpdate,
232238
RemovePartitionStatisticsUpdate,
@@ -595,6 +601,21 @@ def _(update: RemoveStatisticsUpdate, base_metadata: TableMetadata, context: _Ta
595601
return base_metadata.model_copy(update={"statistics": statistics})
596602

597603

604+
@_apply_table_update.register(RemovePartitionSpecsUpdate)
605+
def _(update: RemovePartitionSpecsUpdate, base_metadata: TableMetadata, context: _TableMetadataUpdateContext) -> TableMetadata:
606+
for remove_spec_id in update.spec_ids:
607+
if not any(spec.spec_id == remove_spec_id for spec in base_metadata.partition_specs):
608+
raise ValueError(f"Partition spec with id {remove_spec_id} does not exist")
609+
610+
if base_metadata.default_spec_id in update.spec_ids:
611+
raise ValueError(f"Cannot remove default partition spec: {base_metadata.default_spec_id}")
612+
613+
partition_specs = [spec for spec in base_metadata.partition_specs if spec.spec_id not in update.spec_ids]
614+
615+
context.add_update(update)
616+
return base_metadata.model_copy(update={"partition_specs": partition_specs})
617+
618+
598619
@_apply_table_update.register(RemoveSchemasUpdate)
599620
def _(update: RemoveSchemasUpdate, base_metadata: TableMetadata, context: _TableMetadataUpdateContext) -> TableMetadata:
600621
# This method should error if any schemas do not exist.

pyiceberg/table/update/snapshot.py

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -810,8 +810,13 @@ class ManageSnapshots(UpdateTableMetadata["ManageSnapshots"]):
810810
ms.create_tag(snapshot_id1, "Tag_A").create_tag(snapshot_id2, "Tag_B")
811811
"""
812812

813-
_updates: Tuple[TableUpdate, ...] = ()
814-
_requirements: Tuple[TableRequirement, ...] = ()
813+
_updates: Tuple[TableUpdate, ...]
814+
_requirements: Tuple[TableRequirement, ...]
815+
816+
def __init__(self, transaction: Transaction) -> None:
817+
super().__init__(transaction)
818+
self._updates = ()
819+
self._requirements = ()
815820

816821
def _commit(self) -> UpdatesAndRequirements:
817822
"""Apply the pending changes and commit."""
@@ -924,9 +929,15 @@ class ExpireSnapshots(UpdateTableMetadata["ExpireSnapshots"]):
924929
Pending changes are applied on commit.
925930
"""
926931

927-
_snapshot_ids_to_expire: Set[int] = set()
928-
_updates: Tuple[TableUpdate, ...] = ()
929-
_requirements: Tuple[TableRequirement, ...] = ()
932+
_updates: Tuple[TableUpdate, ...]
933+
_requirements: Tuple[TableRequirement, ...]
934+
_snapshot_ids_to_expire: Set[int]
935+
936+
def __init__(self, transaction: Transaction) -> None:
937+
super().__init__(transaction)
938+
self._updates = ()
939+
self._requirements = ()
940+
self._snapshot_ids_to_expire = set()
930941

931942
def _commit(self) -> UpdatesAndRequirements:
932943
"""

tests/integration/test_catalog.py

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,13 @@
1515
# specific language governing permissions and limitations
1616
# under the License.
1717

18+
import os
1819
from pathlib import Path, PosixPath
1920
from typing import Generator, List
2021

2122
import pytest
2223

23-
from pyiceberg.catalog import Catalog, MetastoreCatalog
24+
from pyiceberg.catalog import Catalog, MetastoreCatalog, load_catalog
2425
from pyiceberg.catalog.hive import HiveCatalog
2526
from pyiceberg.catalog.memory import InMemoryCatalog
2627
from pyiceberg.catalog.rest import RestCatalog
@@ -74,6 +75,16 @@ def rest_catalog() -> Generator[Catalog, None, None]:
7475
clean_up(test_catalog)
7576

7677

78+
@pytest.fixture(scope="function")
79+
def rest_test_catalog() -> Generator[Catalog, None, None]:
80+
if test_catalog_name := os.environ.get("PYICEBERG_TEST_CATALOG"):
81+
test_catalog = load_catalog(test_catalog_name)
82+
yield test_catalog
83+
clean_up(test_catalog)
84+
else:
85+
pytest.skip("PYICEBERG_TEST_CATALOG environment variables not set")
86+
87+
7788
@pytest.fixture(scope="function")
7889
def hive_catalog() -> Generator[Catalog, None, None]:
7990
test_catalog = HiveCatalog(
@@ -95,6 +106,7 @@ def hive_catalog() -> Generator[Catalog, None, None]:
95106
pytest.lazy_fixture("sqlite_catalog_file"),
96107
pytest.lazy_fixture("rest_catalog"),
97108
pytest.lazy_fixture("hive_catalog"),
109+
pytest.lazy_fixture("rest_test_catalog"),
98110
]
99111

100112

tests/io/test_fsspec.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -306,6 +306,36 @@ def test_fsspec_s3_session_properties() -> None:
306306
)
307307

308308

309+
def test_fsspec_s3_session_properties_force_virtual_addressing() -> None:
310+
session_properties: Properties = {
311+
"s3.force-virtual-addressing": True,
312+
"s3.endpoint": "http://localhost:9000",
313+
"s3.access-key-id": "admin",
314+
"s3.secret-access-key": "password",
315+
"s3.region": "us-east-1",
316+
"s3.session-token": "s3.session-token",
317+
**UNIFIED_AWS_SESSION_PROPERTIES,
318+
}
319+
320+
with mock.patch("s3fs.S3FileSystem") as mock_s3fs:
321+
s3_fileio = FsspecFileIO(properties=session_properties)
322+
filename = str(uuid.uuid4())
323+
324+
s3_fileio.new_input(location=f"s3://warehouse/{filename}")
325+
326+
mock_s3fs.assert_called_with(
327+
anon=False,
328+
client_kwargs={
329+
"endpoint_url": "http://localhost:9000",
330+
"aws_access_key_id": "admin",
331+
"aws_secret_access_key": "password",
332+
"region_name": "us-east-1",
333+
"aws_session_token": "s3.session-token",
334+
},
335+
config_kwargs={"s3": {"addressing_style": "virtual"}},
336+
)
337+
338+
309339
def test_fsspec_s3_session_properties_with_anonymous() -> None:
310340
session_properties: Properties = {
311341
"s3.anonymous": "true",

tests/table/test_expire_snapshots.py

Lines changed: 60 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,16 @@
1414
# KIND, either express or implied. See the License for the
1515
# specific language governing permissions and limitations
1616
# under the License.
17-
import datetime
18-
from unittest.mock import MagicMock
17+
import threading
18+
from datetime import datetime, timedelta
19+
from typing import Dict
20+
from unittest.mock import MagicMock, Mock
1921
from uuid import uuid4
2022

2123
import pytest
2224

2325
from pyiceberg.table import CommitTableResponse, Table
26+
from pyiceberg.table.update.snapshot import ExpireSnapshots
2427

2528

2629
def test_cannot_expire_protected_head_snapshot(table_v2: Table) -> None:
@@ -143,7 +146,7 @@ def test_expire_snapshots_by_timestamp_skips_protected(table_v2: Table) -> None:
143146
table_v2.catalog = MagicMock()
144147

145148
# Attempt to expire all snapshots before a future timestamp (so both are candidates)
146-
future_datetime = datetime.datetime.now() + datetime.timedelta(days=1)
149+
future_datetime = datetime.now() + timedelta(days=1)
147150

148151
# Mock the catalog's commit_table to return the current metadata (simulate no change)
149152
mock_response = CommitTableResponse(
@@ -223,3 +226,57 @@ def test_expire_snapshots_by_ids(table_v2: Table) -> None:
223226
assert EXPIRE_SNAPSHOT_1 not in remaining_snapshots
224227
assert EXPIRE_SNAPSHOT_2 not in remaining_snapshots
225228
assert len(table_v2.metadata.snapshots) == 1
229+
230+
231+
def test_thread_safety_fix() -> None:
232+
"""Test that ExpireSnapshots instances have isolated state."""
233+
# Create two ExpireSnapshots instances
234+
expire1 = ExpireSnapshots(Mock())
235+
expire2 = ExpireSnapshots(Mock())
236+
237+
# Verify they have separate snapshot sets (this was the bug!)
238+
# Before fix: both would have the same id (shared class attribute)
239+
# After fix: they should have different ids (separate instance attributes)
240+
assert id(expire1._snapshot_ids_to_expire) != id(expire2._snapshot_ids_to_expire), (
241+
"ExpireSnapshots instances are sharing the same snapshot set - thread safety bug still exists"
242+
)
243+
244+
# Test that modifications to one don't affect the other
245+
expire1._snapshot_ids_to_expire.add(1001)
246+
expire2._snapshot_ids_to_expire.add(2001)
247+
248+
# Verify no cross-contamination of snapshot IDs
249+
assert 2001 not in expire1._snapshot_ids_to_expire, "Snapshot IDs are leaking between instances"
250+
assert 1001 not in expire2._snapshot_ids_to_expire, "Snapshot IDs are leaking between instances"
251+
252+
253+
def test_concurrent_operations() -> None:
254+
"""Test concurrent operations with separate ExpireSnapshots instances."""
255+
results: Dict[str, set[int]] = {"expire1_snapshots": set(), "expire2_snapshots": set()}
256+
257+
def worker1() -> None:
258+
expire1 = ExpireSnapshots(Mock())
259+
expire1._snapshot_ids_to_expire.update([1001, 1002, 1003])
260+
results["expire1_snapshots"] = expire1._snapshot_ids_to_expire.copy()
261+
262+
def worker2() -> None:
263+
expire2 = ExpireSnapshots(Mock())
264+
expire2._snapshot_ids_to_expire.update([2001, 2002, 2003])
265+
results["expire2_snapshots"] = expire2._snapshot_ids_to_expire.copy()
266+
267+
# Run both workers concurrently
268+
thread1 = threading.Thread(target=worker1)
269+
thread2 = threading.Thread(target=worker2)
270+
271+
thread1.start()
272+
thread2.start()
273+
274+
thread1.join()
275+
thread2.join()
276+
277+
# Check for cross-contamination
278+
expected_1 = {1001, 1002, 1003}
279+
expected_2 = {2001, 2002, 2003}
280+
281+
assert results["expire1_snapshots"] == expected_1, "Worker 1 snapshots contaminated"
282+
assert results["expire2_snapshots"] == expected_2, "Worker 2 snapshots contaminated"

tests/table/test_init.py

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@
6666
)
6767
from pyiceberg.table.statistics import BlobMetadata, PartitionStatisticsFile, StatisticsFile
6868
from pyiceberg.table.update import (
69+
AddPartitionSpecUpdate,
6970
AddSnapshotUpdate,
7071
AddSortOrderUpdate,
7172
AssertCreate,
@@ -76,6 +77,7 @@
7677
AssertLastAssignedPartitionId,
7778
AssertRefSnapshotId,
7879
AssertTableUUID,
80+
RemovePartitionSpecsUpdate,
7981
RemovePartitionStatisticsUpdate,
8082
RemovePropertiesUpdate,
8183
RemoveSchemasUpdate,
@@ -1294,6 +1296,38 @@ def test_update_metadata_log_overflow(table_v2: Table) -> None:
12941296
assert len(new_metadata.metadata_log) == 1
12951297

12961298

1299+
def test_remove_partition_spec_update(table_v2: Table) -> None:
1300+
base_metadata = table_v2.metadata
1301+
new_spec = PartitionSpec(PartitionField(source_id=2, field_id=1001, transform=IdentityTransform(), name="y"), spec_id=1)
1302+
metadata_with_new_spec = update_table_metadata(base_metadata, (AddPartitionSpecUpdate(spec=new_spec),))
1303+
1304+
assert len(metadata_with_new_spec.partition_specs) == 2
1305+
1306+
update = RemovePartitionSpecsUpdate(spec_ids=[1])
1307+
updated_metadata = update_table_metadata(
1308+
metadata_with_new_spec,
1309+
(update,),
1310+
)
1311+
1312+
assert len(updated_metadata.partition_specs) == 1
1313+
1314+
1315+
def test_remove_partition_spec_update_spec_does_not_exist(table_v2: Table) -> None:
1316+
update = RemovePartitionSpecsUpdate(
1317+
spec_ids=[123],
1318+
)
1319+
with pytest.raises(ValueError, match="Partition spec with id 123 does not exist"):
1320+
update_table_metadata(table_v2.metadata, (update,))
1321+
1322+
1323+
def test_remove_partition_spec_update_default_spec(table_v2: Table) -> None:
1324+
update = RemovePartitionSpecsUpdate(
1325+
spec_ids=[0],
1326+
)
1327+
with pytest.raises(ValueError, match="Cannot remove default partition spec: 0"):
1328+
update_table_metadata(table_v2.metadata, (update,))
1329+
1330+
12971331
def test_remove_schemas_update(table_v2: Table) -> None:
12981332
base_metadata = table_v2.metadata
12991333
assert len(base_metadata.schemas) == 2

0 commit comments

Comments
 (0)