Skip to content

Commit 1126a6e

Browse files
author
Tom McCormick
committed
Add comprehensive ORC read support to PyArrow I/O
Features implemented: - Record batching and table reading via ArrowScan - Column projection and row filtering with predicate pushdown - Positional deletes support (with ORC-specific non-dictionary handling) - Schema mapping for files without field IDs - Streaming via Iterator[pa.RecordBatch] for memory efficiency - Full integration with Iceberg metadata and partitioning
1 parent 52d810e commit 1126a6e

5 files changed

Lines changed: 573 additions & 25 deletions

File tree

pyiceberg/io/pyarrow.py

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1011,6 +1011,10 @@ def _expression_to_complementary_pyarrow(expr: BooleanExpression) -> pc.Expressi
10111011
def _get_file_format(file_format: FileFormat, **kwargs: Dict[str, Any]) -> ds.FileFormat:
10121012
if file_format == FileFormat.PARQUET:
10131013
return ds.ParquetFileFormat(**kwargs)
1014+
elif file_format == FileFormat.ORC:
1015+
# ORC doesn't support pre_buffer and buffer_size parameters
1016+
orc_kwargs = {k: v for k, v in kwargs.items() if k not in ['pre_buffer', 'buffer_size']}
1017+
return ds.OrcFileFormat(**orc_kwargs)
10141018
else:
10151019
raise ValueError(f"Unsupported file format: {file_format}")
10161020

@@ -1027,6 +1031,15 @@ def _read_deletes(io: FileIO, data_file: DataFile) -> Dict[str, pa.ChunkedArray]
10271031
file.as_py(): table.filter(pc.field("file_path") == file).column("pos")
10281032
for file in table.column("file_path").chunks[0].dictionary
10291033
}
1034+
elif data_file.file_format == FileFormat.ORC:
1035+
with io.new_input(data_file.file_path).open() as fi:
1036+
delete_fragment = _get_file_format(data_file.file_format).make_fragment(fi)
1037+
table = ds.Scanner.from_fragment(fragment=delete_fragment).to_table()
1038+
# For ORC, file_path columns are not dictionary-encoded, so we use unique() directly
1039+
return {
1040+
path.as_py(): table.filter(pc.field("file_path") == path).column("pos")
1041+
for path in table.column("file_path").unique()
1042+
}
10301043
elif data_file.file_format == FileFormat.PUFFIN:
10311044
with io.new_input(data_file.file_path).open() as fi:
10321045
payload = fi.read()
@@ -1495,7 +1508,7 @@ def _task_to_record_batches(
14951508
format_version: TableVersion = TableProperties.DEFAULT_FORMAT_VERSION,
14961509
downcast_ns_timestamp_to_us: Optional[bool] = None,
14971510
) -> Iterator[pa.RecordBatch]:
1498-
arrow_format = ds.ParquetFileFormat(pre_buffer=True, buffer_size=(ONE_MEGABYTE * 8))
1511+
arrow_format = _get_file_format(task.file.file_format, pre_buffer=True, buffer_size=(ONE_MEGABYTE * 8))
14991512
with io.new_input(task.file.file_path).open() as fin:
15001513
fragment = arrow_format.make_fragment(fin)
15011514
physical_schema = fragment.physical_schema

pyiceberg/table/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -211,6 +211,9 @@ class TableProperties:
211211
WRITE_OBJECT_STORE_PARTITIONED_PATHS_DEFAULT = True
212212

213213
WRITE_DATA_PATH = "write.data.path"
214+
215+
WRITE_FILE_FORMAT = "write.format.default"
216+
WRITE_FILE_FORMAT_DEFAULT = "parquet"
214217
WRITE_METADATA_PATH = "write.metadata.path"
215218

216219
DELETE_MODE = "write.delete.mode"

tests/conftest.py

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2413,6 +2413,32 @@ def example_task(data_file: str) -> FileScanTask:
24132413
)
24142414

24152415

2416+
@pytest.fixture
2417+
def data_file_orc(table_schema_simple: Schema, tmp_path: str) -> str:
2418+
import pyarrow as pa
2419+
import pyarrow.orc as orc
2420+
2421+
from pyiceberg.io.pyarrow import schema_to_pyarrow
2422+
2423+
table = pa.table(
2424+
{"foo": ["a", "b", "c"], "bar": [1, 2, 3], "baz": [True, False, None]},
2425+
schema=schema_to_pyarrow(table_schema_simple),
2426+
)
2427+
2428+
file_path = f"{tmp_path}/0000-data.orc"
2429+
orc.write_table(table=table, where=file_path)
2430+
return file_path
2431+
2432+
2433+
@pytest.fixture
2434+
def example_task_orc(data_file_orc: str) -> FileScanTask:
2435+
datafile = DataFile.from_args(file_path=data_file_orc, file_format=FileFormat.ORC, file_size_in_bytes=1925)
2436+
datafile.spec_id = 0
2437+
return FileScanTask(
2438+
data_file=datafile,
2439+
)
2440+
2441+
24162442
@pytest.fixture(scope="session")
24172443
def warehouse(tmp_path_factory: pytest.TempPathFactory) -> Path:
24182444
return tmp_path_factory.mktemp("test_sql")
@@ -2442,6 +2468,23 @@ def table_v2(example_table_metadata_v2: Dict[str, Any]) -> Table:
24422468
)
24432469

24442470

2471+
@pytest.fixture
2472+
def table_v2_orc(example_table_metadata_v2: Dict[str, Any]) -> Table:
2473+
import copy
2474+
metadata_dict = copy.deepcopy(example_table_metadata_v2)
2475+
if not metadata_dict["properties"]:
2476+
metadata_dict["properties"] = {}
2477+
metadata_dict["properties"]["write.format.default"] = "ORC"
2478+
table_metadata = TableMetadataV2(**metadata_dict)
2479+
return Table(
2480+
identifier=("database", "table_orc"),
2481+
metadata=table_metadata,
2482+
metadata_location=f"{table_metadata.location}/uuid.metadata.json",
2483+
io=load_file_io(),
2484+
catalog=NoopCatalog("NoopCatalog"),
2485+
)
2486+
2487+
24452488
@pytest.fixture
24462489
def table_v2_with_fixed_and_decimal_types(
24472490
table_metadata_v2_with_fixed_and_decimal_types: Dict[str, Any],

0 commit comments

Comments
 (0)