Skip to content

Commit 9f8b7b7

Browse files
committed
fix: Cast smaller integer types to int32/int64 on write for Spark compatibility
When writing PyArrow tables with smaller integer types (uint8, int8, int16, uint16) to Iceberg tables with IntegerType columns, PyIceberg preserves the original Arrow type in the Parquet file. This causes Spark to fail with: java.lang.UnsupportedOperationException: Unsupported logical type: UINT_8 The fix casts smaller integer types to their canonical Iceberg representation (int32 for IntegerType, int64 for LongType) during write, ensuring cross-platform compatibility. Only widening conversions are allowed - narrowing conversions (e.g., int64 to int32) continue to be rejected via the existing promote() function. Closes #2791
1 parent 65ba595 commit 9f8b7b7

2 files changed

Lines changed: 41 additions & 0 deletions

File tree

pyiceberg/io/pyarrow.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1903,6 +1903,15 @@ def _cast_if_needed(self, field: NestedField, values: pa.Array) -> pa.Array:
19031903
elif target_type.unit == "us" and values.type.unit in {"s", "ms", "us"}:
19041904
return values.cast(target_type)
19051905
raise ValueError(f"Unsupported schema projection from {values.type} to {target_type}")
1906+
elif isinstance(field.field_type, (IntegerType, LongType)):
1907+
# Cast smaller integer types to target type for cross-platform compatibility
1908+
# Only allow widening conversions (smaller bit width to larger)
1909+
# Narrowing conversions fall through to promote() handling below
1910+
if pa.types.is_integer(values.type):
1911+
source_width = values.type.bit_width
1912+
target_width = target_type.bit_width
1913+
if source_width < target_width:
1914+
return values.cast(target_type)
19061915

19071916
if field.field_type != file_field.field_type:
19081917
target_schema = schema_to_pyarrow(

tests/io/test_pyarrow.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2716,6 +2716,38 @@ def test__to_requested_schema_timestamps_without_downcast_raises_exception(
27162716
assert "Unsupported schema projection from timestamp[ns] to timestamp[us]" in str(exc_info.value)
27172717

27182718

2719+
@pytest.mark.parametrize(
2720+
"arrow_type,iceberg_type,expected_arrow_type",
2721+
[
2722+
(pa.uint8(), IntegerType(), pa.int32()),
2723+
(pa.int8(), IntegerType(), pa.int32()),
2724+
(pa.int16(), IntegerType(), pa.int32()),
2725+
(pa.uint16(), IntegerType(), pa.int32()),
2726+
(pa.uint32(), LongType(), pa.int64()),
2727+
(pa.int32(), LongType(), pa.int64()),
2728+
],
2729+
)
2730+
def test__to_requested_schema_integer_promotion(
2731+
arrow_type: pa.DataType,
2732+
iceberg_type: PrimitiveType,
2733+
expected_arrow_type: pa.DataType,
2734+
) -> None:
2735+
"""Test that smaller integer types are cast to target Iceberg type during write."""
2736+
requested_schema = Schema(NestedField(1, "col", iceberg_type, required=False))
2737+
file_schema = requested_schema
2738+
2739+
arrow_schema = pa.schema([pa.field("col", arrow_type)])
2740+
data = pa.array([1, 2, 3, None], type=arrow_type)
2741+
batch = pa.RecordBatch.from_arrays([data], schema=arrow_schema)
2742+
2743+
result = _to_requested_schema(
2744+
requested_schema, file_schema, batch, downcast_ns_timestamp_to_us=False, include_field_ids=False
2745+
)
2746+
2747+
assert result.schema[0].type == expected_arrow_type
2748+
assert result.column(0).to_pylist() == [1, 2, 3, None]
2749+
2750+
27192751
def test_pyarrow_file_io_fs_by_scheme_cache() -> None:
27202752
# It's better to set up multi-region minio servers for an integration test once `endpoint_url` argument becomes available for `resolve_s3_region`
27212753
# Refer to: https://github.com/apache/arrow/issues/43713

0 commit comments

Comments
 (0)