Skip to content

Commit 71143f6

Browse files
author
Tom McCormick
committed
Add comprehensive ORC field ID support and testing
- Add ORC_FIELD_ID_KEY constant for ORC field ID metadata - Update _get_field_id function to support both Parquet and ORC field IDs - Update schema_to_pyarrow to accept file_format parameter for proper field ID handling - Update _ConvertToArrowSchema to add correct field IDs based on file format - Add comprehensive test coverage: * test_orc_field_id_extraction: Tests field ID extraction from PyArrow metadata * test_orc_schema_with_field_ids: Tests ORC reading with embedded field IDs (no name mapping needed) * test_orc_schema_conversion_with_field_ids: Tests schema conversion with ORC field IDs These changes fix the original error where ORC files with field IDs couldn't be read without name mapping, and provide comprehensive test coverage to prevent regression.
1 parent 1126a6e commit 71143f6

2 files changed

Lines changed: 231 additions & 8 deletions

File tree

pyiceberg/io/pyarrow.py

Lines changed: 29 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,8 @@
201201
ICEBERG_SCHEMA = b"iceberg.schema"
202202
# The PARQUET: in front means that it is Parquet specific, in this case the field_id
203203
PYARROW_PARQUET_FIELD_ID_KEY = b"PARQUET:field_id"
204+
# ORC field ID key for Iceberg field IDs in ORC metadata
205+
ORC_FIELD_ID_KEY = b"iceberg.id"
204206
PYARROW_FIELD_DOC_KEY = b"doc"
205207
LIST_ELEMENT_NAME = "element"
206208
MAP_KEY_NAME = "key"
@@ -690,16 +692,18 @@ def schema_to_pyarrow(
690692
schema: Union[Schema, IcebergType],
691693
metadata: Dict[bytes, bytes] = EMPTY_DICT,
692694
include_field_ids: bool = True,
695+
file_format: Optional[FileFormat] = None,
693696
) -> pa.schema:
694-
return visit(schema, _ConvertToArrowSchema(metadata, include_field_ids))
697+
return visit(schema, _ConvertToArrowSchema(metadata, include_field_ids, file_format))
695698

696699

697700
class _ConvertToArrowSchema(SchemaVisitorPerPrimitiveType[pa.DataType]):
698701
_metadata: Dict[bytes, bytes]
699702

700-
def __init__(self, metadata: Dict[bytes, bytes] = EMPTY_DICT, include_field_ids: bool = True) -> None:
703+
def __init__(self, metadata: Dict[bytes, bytes] = EMPTY_DICT, include_field_ids: bool = True, file_format: Optional[FileFormat] = None) -> None:
701704
self._metadata = metadata
702705
self._include_field_ids = include_field_ids
706+
self._file_format = file_format
703707

704708
def schema(self, _: Schema, struct_result: pa.StructType) -> pa.schema:
705709
return pa.schema(list(struct_result), metadata=self._metadata)
@@ -712,7 +716,12 @@ def field(self, field: NestedField, field_result: pa.DataType) -> pa.Field:
712716
if field.doc:
713717
metadata[PYARROW_FIELD_DOC_KEY] = field.doc
714718
if self._include_field_ids:
715-
metadata[PYARROW_PARQUET_FIELD_ID_KEY] = str(field.field_id)
719+
# Add field ID based on file format
720+
if self._file_format == FileFormat.ORC:
721+
metadata[ORC_FIELD_ID_KEY] = str(field.field_id)
722+
else:
723+
# Default to Parquet for backward compatibility
724+
metadata[PYARROW_PARQUET_FIELD_ID_KEY] = str(field.field_id)
716725

717726
return pa.field(
718727
name=field.name,
@@ -1241,11 +1250,21 @@ def primitive(self, primitive: pa.DataType) -> T:
12411250

12421251

12431252
def _get_field_id(field: pa.Field) -> Optional[int]:
1244-
return (
1245-
int(field_id_str.decode())
1246-
if (field.metadata and (field_id_str := field.metadata.get(PYARROW_PARQUET_FIELD_ID_KEY)))
1247-
else None
1248-
)
1253+
"""Return the Iceberg field ID from Parquet or ORC metadata if available."""
1254+
if not field.metadata:
1255+
return None
1256+
1257+
# Try Parquet field ID first
1258+
field_id_bytes = field.metadata.get(PYARROW_PARQUET_FIELD_ID_KEY)
1259+
if field_id_bytes:
1260+
return int(field_id_bytes.decode())
1261+
1262+
# Fallback: try ORC field ID
1263+
field_id_bytes = field.metadata.get(ORC_FIELD_ID_KEY)
1264+
if field_id_bytes:
1265+
return int(field_id_bytes.decode())
1266+
1267+
return None
12491268

12501269

12511270
class _HasIds(PyArrowSchemaVisitor[bool]):
@@ -1858,6 +1877,8 @@ def _construct_field(self, field: NestedField, arrow_type: pa.DataType) -> pa.Fi
18581877
if field.doc:
18591878
metadata[PYARROW_FIELD_DOC_KEY] = field.doc
18601879
if self._include_field_ids:
1880+
# For projection visitor, we don't know the file format, so default to Parquet
1881+
# This is used for schema conversion during reads, not writes
18611882
metadata[PYARROW_PARQUET_FIELD_ID_KEY] = str(field.field_id)
18621883

18631884
return pa.field(

tests/io/test_pyarrow.py

Lines changed: 202 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3290,3 +3290,205 @@ def test_orc_record_batching_streaming(tmp_path):
32903290
for batch in batches:
32913291
ids = batch.column("id").to_pylist()
32923292
assert all(id_val > 500 for id_val in ids), f"Found ID <= 500: {ids}"
3293+
3294+
3295+
def test_orc_field_id_extraction():
3296+
"""
3297+
Test ORC field ID extraction from PyArrow field metadata.
3298+
To run just this test:
3299+
pytest tests/io/test_pyarrow.py -k test_orc_field_id_extraction
3300+
"""
3301+
import pyarrow as pa
3302+
from pyiceberg.io.pyarrow import _get_field_id, ORC_FIELD_ID_KEY, PYARROW_PARQUET_FIELD_ID_KEY
3303+
3304+
# Test 1: Parquet field ID extraction
3305+
field_parquet = pa.field('test_parquet', pa.string(), metadata={PYARROW_PARQUET_FIELD_ID_KEY: b'123'})
3306+
field_id = _get_field_id(field_parquet)
3307+
assert field_id == 123, f"Expected Parquet field ID 123, got {field_id}"
3308+
3309+
# Test 2: ORC field ID extraction
3310+
field_orc = pa.field('test_orc', pa.string(), metadata={ORC_FIELD_ID_KEY: b'456'})
3311+
field_id = _get_field_id(field_orc)
3312+
assert field_id == 456, f"Expected ORC field ID 456, got {field_id}"
3313+
3314+
# Test 3: No field ID
3315+
field_no_id = pa.field('test_no_id', pa.string())
3316+
field_id = _get_field_id(field_no_id)
3317+
assert field_id is None, f"Expected None for field without ID, got {field_id}"
3318+
3319+
# Test 4: Both field IDs present (should prefer Parquet)
3320+
field_both = pa.field('test_both', pa.string(), metadata={
3321+
PYARROW_PARQUET_FIELD_ID_KEY: b'123',
3322+
ORC_FIELD_ID_KEY: b'456'
3323+
})
3324+
field_id = _get_field_id(field_both)
3325+
assert field_id == 123, f"Expected Parquet field ID 123 (preferred), got {field_id}"
3326+
3327+
# Test 5: Empty metadata
3328+
field_empty_metadata = pa.field('test_empty', pa.string(), metadata={})
3329+
field_id = _get_field_id(field_empty_metadata)
3330+
assert field_id is None, f"Expected None for field with empty metadata, got {field_id}"
3331+
3332+
# Test 6: Invalid field ID format
3333+
field_invalid = pa.field('test_invalid', pa.string(), metadata={ORC_FIELD_ID_KEY: b'not_a_number'})
3334+
try:
3335+
field_id = _get_field_id(field_invalid)
3336+
assert False, "Expected ValueError for invalid field ID format"
3337+
except ValueError:
3338+
pass # Expected behavior
3339+
3340+
# Test 7: Different data types
3341+
field_int = pa.field('test_int', pa.int32(), metadata={ORC_FIELD_ID_KEY: b'789'})
3342+
field_id = _get_field_id(field_int)
3343+
assert field_id == 789, f"Expected ORC field ID 789 for int field, got {field_id}"
3344+
3345+
field_bool = pa.field('test_bool', pa.bool_(), metadata={ORC_FIELD_ID_KEY: b'101'})
3346+
field_id = _get_field_id(field_bool)
3347+
assert field_id == 101, f"Expected ORC field ID 101 for bool field, got {field_id}"
3348+
3349+
3350+
def test_orc_schema_with_field_ids(tmp_path):
3351+
"""
3352+
Test ORC reading with actual field IDs embedded in the schema.
3353+
This test creates an ORC file with field IDs and reads it without name mapping.
3354+
To run just this test:
3355+
pytest tests/io/test_pyarrow.py -k test_orc_schema_with_field_ids
3356+
"""
3357+
import pyarrow as pa
3358+
import pyarrow.orc as orc
3359+
from pyiceberg.schema import Schema, NestedField
3360+
from pyiceberg.types import IntegerType, StringType
3361+
from pyiceberg.manifest import FileFormat, DataFileContent
3362+
from pyiceberg.table.metadata import TableMetadataV2
3363+
from pyiceberg.partitioning import PartitionSpec
3364+
from pyiceberg.io.pyarrow import PyArrowFileIO, ArrowScan, schema_to_pyarrow, ORC_FIELD_ID_KEY
3365+
from pyiceberg.table import FileScanTask
3366+
from pyiceberg.expressions import AlwaysTrue
3367+
3368+
# Define schema
3369+
schema = Schema(
3370+
NestedField(1, "id", IntegerType(), required=True),
3371+
NestedField(2, "name", StringType(), required=False),
3372+
)
3373+
3374+
# Create PyArrow schema with ORC field IDs
3375+
arrow_schema = pa.schema([
3376+
pa.field("id", pa.int32(), metadata={ORC_FIELD_ID_KEY: b"1"}),
3377+
pa.field("name", pa.string(), metadata={ORC_FIELD_ID_KEY: b"2"})
3378+
])
3379+
3380+
# Create data with the schema that has field IDs
3381+
data = pa.table({
3382+
"id": pa.array([1, 2, 3], type=pa.int32()),
3383+
"name": ["alice", "bob", "charlie"]
3384+
}, schema=arrow_schema)
3385+
3386+
# Create ORC file
3387+
orc_path = tmp_path / "field_id_test.orc"
3388+
orc.write_table(data, str(orc_path))
3389+
3390+
# Create table metadata WITHOUT name mapping (should work with field IDs)
3391+
table_metadata = TableMetadataV2(
3392+
location=f"file://{tmp_path}/test_location",
3393+
last_column_id=2,
3394+
format_version=2,
3395+
schemas=[schema],
3396+
partition_specs=[PartitionSpec()],
3397+
properties={
3398+
# No name mapping - should work with field IDs
3399+
}
3400+
)
3401+
io = PyArrowFileIO()
3402+
3403+
# Create DataFile
3404+
from pyiceberg.manifest import DataFile
3405+
from pyiceberg.typedef import Record
3406+
data_file = DataFile.from_args(
3407+
content=DataFileContent.DATA,
3408+
file_path=str(orc_path),
3409+
file_format=FileFormat.ORC,
3410+
partition=Record(),
3411+
file_size_in_bytes=orc_path.stat().st_size,
3412+
sort_order_id=None,
3413+
spec_id=0,
3414+
equality_ids=None,
3415+
key_metadata=None,
3416+
record_count=3,
3417+
column_sizes={1: 12, 2: 30},
3418+
value_counts={1: 3, 2: 3},
3419+
null_value_counts={1: 0, 2: 0},
3420+
nan_value_counts={1: 0, 2: 0},
3421+
lower_bounds={1: b"\x01\x00\x00\x00", 2: b"alice"},
3422+
upper_bounds={1: b"\x03\x00\x00\x00", 2: b"charlie"},
3423+
split_offsets=None,
3424+
)
3425+
data_file.spec_id = 0
3426+
3427+
# Read back using ArrowScan - should work without name mapping
3428+
scan = ArrowScan(
3429+
table_metadata=table_metadata,
3430+
io=io,
3431+
projected_schema=schema,
3432+
row_filter=AlwaysTrue(),
3433+
case_sensitive=True,
3434+
)
3435+
scan_task = FileScanTask(data_file=data_file)
3436+
table_read = scan.to_table([scan_task])
3437+
3438+
# Verify the data was read correctly
3439+
assert table_read.num_rows == 3
3440+
assert table_read.num_columns == 2
3441+
assert table_read.column_names == ["id", "name"]
3442+
3443+
# Verify data matches
3444+
assert table_read.column("id").to_pylist() == [1, 2, 3]
3445+
assert table_read.column("name").to_pylist() == ["alice", "bob", "charlie"]
3446+
3447+
3448+
def test_orc_schema_conversion_with_field_ids():
3449+
"""
3450+
Test that schema_to_pyarrow correctly adds ORC field IDs when file_format is specified.
3451+
To run just this test:
3452+
pytest tests/io/test_pyarrow.py -k test_orc_schema_conversion_with_field_ids
3453+
"""
3454+
from pyiceberg.schema import Schema, NestedField
3455+
from pyiceberg.types import IntegerType, StringType
3456+
from pyiceberg.manifest import FileFormat
3457+
from pyiceberg.io.pyarrow import schema_to_pyarrow, ORC_FIELD_ID_KEY, PYARROW_PARQUET_FIELD_ID_KEY
3458+
3459+
# Define schema
3460+
schema = Schema(
3461+
NestedField(1, "id", IntegerType(), required=True),
3462+
NestedField(2, "name", StringType(), required=False),
3463+
)
3464+
3465+
# Test 1: Default behavior (should add Parquet field IDs)
3466+
arrow_schema_default = schema_to_pyarrow(schema, include_field_ids=True)
3467+
3468+
id_field = arrow_schema_default.field(0) # id field
3469+
name_field = arrow_schema_default.field(1) # name field
3470+
3471+
assert id_field.metadata[PYARROW_PARQUET_FIELD_ID_KEY] == b"1"
3472+
assert name_field.metadata[PYARROW_PARQUET_FIELD_ID_KEY] == b"2"
3473+
assert ORC_FIELD_ID_KEY not in id_field.metadata
3474+
assert ORC_FIELD_ID_KEY not in name_field.metadata
3475+
3476+
# Test 2: Explicitly specify ORC format
3477+
arrow_schema_orc = schema_to_pyarrow(schema, include_field_ids=True, file_format=FileFormat.ORC)
3478+
3479+
id_field_orc = arrow_schema_orc.field(0) # id field
3480+
name_field_orc = arrow_schema_orc.field(1) # name field
3481+
3482+
assert id_field_orc.metadata[ORC_FIELD_ID_KEY] == b"1"
3483+
assert name_field_orc.metadata[ORC_FIELD_ID_KEY] == b"2"
3484+
assert PYARROW_PARQUET_FIELD_ID_KEY not in id_field_orc.metadata
3485+
assert PYARROW_PARQUET_FIELD_ID_KEY not in name_field_orc.metadata
3486+
3487+
# Test 3: No field IDs
3488+
arrow_schema_no_ids = schema_to_pyarrow(schema, include_field_ids=False, file_format=FileFormat.ORC)
3489+
3490+
id_field_no_ids = arrow_schema_no_ids.field(0)
3491+
name_field_no_ids = arrow_schema_no_ids.field(1)
3492+
3493+
assert not id_field_no_ids.metadata
3494+
assert not name_field_no_ids.metadata

0 commit comments

Comments
 (0)