Skip to content

Commit c73f310

Browse files
committed
disable write path and add tests
1 parent 9f637fb commit c73f310

2 files changed

Lines changed: 81 additions & 3 deletions

File tree

pyiceberg/io/pyarrow.py

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1656,6 +1656,7 @@ def _task_to_record_batches(
16561656
current_batch,
16571657
downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us,
16581658
projected_missing_fields=projected_missing_fields,
1659+
allow_timestamp_tz_mismatch=True,
16591660
)
16601661

16611662

@@ -1849,13 +1850,18 @@ def _to_requested_schema(
18491850
downcast_ns_timestamp_to_us: bool = False,
18501851
include_field_ids: bool = False,
18511852
projected_missing_fields: dict[int, Any] = EMPTY_DICT,
1853+
allow_timestamp_tz_mismatch: bool = False,
18521854
) -> pa.RecordBatch:
18531855
# We could reuse some of these visitors
18541856
struct_array = visit_with_partner(
18551857
requested_schema,
18561858
batch,
18571859
ArrowProjectionVisitor(
1858-
file_schema, downcast_ns_timestamp_to_us, include_field_ids, projected_missing_fields=projected_missing_fields
1860+
file_schema,
1861+
downcast_ns_timestamp_to_us,
1862+
include_field_ids,
1863+
projected_missing_fields=projected_missing_fields,
1864+
allow_timestamp_tz_mismatch=allow_timestamp_tz_mismatch,
18591865
),
18601866
ArrowAccessor(file_schema),
18611867
)
@@ -1868,6 +1874,7 @@ class ArrowProjectionVisitor(SchemaWithPartnerVisitor[pa.Array, pa.Array | None]
18681874
_downcast_ns_timestamp_to_us: bool
18691875
_use_large_types: bool | None
18701876
_projected_missing_fields: dict[int, Any]
1877+
_allow_timestamp_tz_mismatch: bool
18711878

18721879
def __init__(
18731880
self,
@@ -1876,12 +1883,16 @@ def __init__(
18761883
include_field_ids: bool = False,
18771884
use_large_types: bool | None = None,
18781885
projected_missing_fields: dict[int, Any] = EMPTY_DICT,
1886+
allow_timestamp_tz_mismatch: bool = False,
18791887
) -> None:
18801888
self._file_schema = file_schema
18811889
self._include_field_ids = include_field_ids
18821890
self._downcast_ns_timestamp_to_us = downcast_ns_timestamp_to_us
18831891
self._use_large_types = use_large_types
18841892
self._projected_missing_fields = projected_missing_fields
1893+
# Allow reading timestamp with/without timezone interchangeably (aligns with Spark behavior)
1894+
# This is intentionally disabled on the write path to enforce the Iceberg spec distinction
1895+
self._allow_timestamp_tz_mismatch = allow_timestamp_tz_mismatch
18851896

18861897
if use_large_types is not None:
18871898
deprecation_message(
@@ -1896,11 +1907,14 @@ def _cast_if_needed(self, field: NestedField, values: pa.Array) -> pa.Array:
18961907
if field.field_type.is_primitive:
18971908
if (target_type := schema_to_pyarrow(field.field_type, include_field_ids=self._include_field_ids)) != values.type:
18981909
if field.field_type == TimestampType():
1910+
source_tz_compatible = values.type.tz is None or (
1911+
self._allow_timestamp_tz_mismatch and values.type.tz in UTC_ALIASES
1912+
)
18991913
if (
19001914
pa.types.is_timestamp(target_type)
19011915
and not target_type.tz
19021916
and pa.types.is_timestamp(values.type)
1903-
and (values.type.tz in UTC_ALIASES or values.type.tz is None)
1917+
and source_tz_compatible
19041918
):
19051919
# Downcasting of nanoseconds to microseconds
19061920
if target_type.unit == "us" and values.type.unit == "ns" and self._downcast_ns_timestamp_to_us:

tests/io/test_pyarrow.py

Lines changed: 65 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@
8181
expression_to_pyarrow,
8282
parquet_path_to_id_mapping,
8383
schema_to_pyarrow,
84+
write_file,
8485
)
8586
from pyiceberg.manifest import DataFile, DataFileContent, FileFormat
8687
from pyiceberg.partitioning import PartitionField, PartitionSpec
@@ -2744,7 +2745,10 @@ def test__to_requested_schema_timestamptz_to_timestamp_projection() -> None:
27442745
# table schema expects timestamp without timezone
27452746
table_schema = Schema(NestedField(1, "ts_field", TimestampType(), required=False))
27462747

2747-
actual_result = _to_requested_schema(table_schema, file_schema, batch, downcast_ns_timestamp_to_us=True)
2748+
# allow_timestamp_tz_mismatch=True enables reading timestamptz as timestamp
2749+
actual_result = _to_requested_schema(
2750+
table_schema, file_schema, batch, downcast_ns_timestamp_to_us=True, allow_timestamp_tz_mismatch=True
2751+
)
27482752
expected = pa.record_batch(
27492753
[
27502754
pa.array(
@@ -2762,6 +2766,66 @@ def test__to_requested_schema_timestamptz_to_timestamp_projection() -> None:
27622766
assert expected.equals(actual_result)
27632767

27642768

2769+
def test__to_requested_schema_timestamptz_to_timestamp_write_rejects() -> None:
2770+
"""Test that the write path (default) rejects timestamptz to timestamp casting.
2771+
2772+
This ensures we enforce the Iceberg spec distinction between timestamp and timestamptz on writes,
2773+
while the read path can be more permissive (like Spark) via allow_timestamp_tz_mismatch=True.
2774+
"""
2775+
# file is written with timestamp with timezone
2776+
file_schema = Schema(NestedField(1, "ts_field", TimestamptzType(), required=False))
2777+
batch = pa.record_batch(
2778+
[
2779+
pa.array(
2780+
[
2781+
datetime(2025, 8, 14, 12, 0, 0, tzinfo=timezone.utc),
2782+
datetime(2025, 8, 14, 13, 0, 0, tzinfo=timezone.utc),
2783+
],
2784+
type=pa.timestamp("us", tz="UTC"),
2785+
)
2786+
],
2787+
names=["ts_field"],
2788+
)
2789+
2790+
# table schema expects timestamp without timezone
2791+
table_schema = Schema(NestedField(1, "ts_field", TimestampType(), required=False))
2792+
2793+
# allow_timestamp_tz_mismatch=False (default, used in write path) should raise
2794+
with pytest.raises(ValueError, match="Unsupported schema projection"):
2795+
_to_requested_schema(
2796+
table_schema, file_schema, batch, downcast_ns_timestamp_to_us=True, allow_timestamp_tz_mismatch=False
2797+
)
2798+
2799+
2800+
def test_write_file_rejects_timestamptz_to_timestamp(tmp_path: Path) -> None:
2801+
"""Test that write_file rejects writing timestamptz data to a timestamp column."""
2802+
from pyiceberg.table import WriteTask
2803+
2804+
# Table expects timestamp (no tz), but data has timestamptz
2805+
table_schema = Schema(NestedField(1, "ts_field", TimestampType(), required=False))
2806+
task_schema = Schema(NestedField(1, "ts_field", TimestamptzType(), required=False))
2807+
2808+
arrow_data = pa.table({"ts_field": [datetime(2025, 8, 14, 12, 0, 0, tzinfo=timezone.utc)]})
2809+
2810+
table_metadata = TableMetadataV2(
2811+
location=f"file://{tmp_path}",
2812+
last_column_id=1,
2813+
format_version=2,
2814+
schemas=[table_schema],
2815+
partition_specs=[PartitionSpec()],
2816+
)
2817+
2818+
task = WriteTask(
2819+
write_uuid=uuid.uuid4(),
2820+
task_id=0,
2821+
record_batches=arrow_data.to_batches(),
2822+
schema=task_schema,
2823+
)
2824+
2825+
with pytest.raises(ValueError, match="Unsupported schema projection"):
2826+
list(write_file(io=PyArrowFileIO(), table_metadata=table_metadata, tasks=iter([task])))
2827+
2828+
27652829
def test__to_requested_schema_timestamps(
27662830
arrow_table_schema_with_all_timestamp_precisions: pa.Schema,
27672831
arrow_table_with_all_timestamp_precisions: pa.Table,

0 commit comments

Comments
 (0)