Skip to content

Commit 03bda3d

Browse files
sumedhsakdeoclaude
andcommitted
refactor: Replace ScanOrder enum with class hierarchy
Replace ScanOrder(str, Enum) with: - ScanOrder(ABC): Base class for ordering strategies - TaskOrder(ScanOrder): Default behavior, preserves existing API - ArrivalOrder(ScanOrder): Encapsulates concurrent_streams and max_buffered_batches This addresses reviewer feedback about unused parameters and improves type safety. Parameters are now scoped to their appropriate ordering mode. Rename concurrent_files → concurrent_streams for clarity. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
1 parent afb244c commit 03bda3d

2 files changed

Lines changed: 53 additions & 46 deletions

File tree

pyiceberg/io/pyarrow.py

Lines changed: 21 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@
144144
visit,
145145
visit_with_partner,
146146
)
147-
from pyiceberg.table import DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE, ScanOrder, TableProperties
147+
from pyiceberg.table import DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE, ScanOrder, TaskOrder, ArrivalOrder, TableProperties
148148
from pyiceberg.table.locations import load_location_provider
149149
from pyiceberg.table.metadata import TableMetadata
150150
from pyiceberg.table.name_mapping import NameMapping, apply_name_mapping
@@ -1691,19 +1691,19 @@ def _read_all_delete_files(io: FileIO, tasks: Iterable[FileScanTask]) -> dict[st
16911691
def _bounded_concurrent_batches(
16921692
tasks: list[FileScanTask],
16931693
batch_fn: Callable[[FileScanTask], Iterator[pa.RecordBatch]],
1694-
concurrent_files: int,
1694+
concurrent_streams: int,
16951695
max_buffered_batches: int = 16,
16961696
) -> Generator[pa.RecordBatch, None, None]:
16971697
"""Read batches from multiple files concurrently with bounded memory.
16981698
1699-
Uses a per-scan ThreadPoolExecutor(max_workers=concurrent_files) to naturally
1699+
Uses a per-scan ThreadPoolExecutor(max_workers=concurrent_streams) to naturally
17001700
bound concurrency. Workers push batches into a bounded queue which provides
17011701
backpressure when the consumer is slower than the producers.
17021702
17031703
Args:
17041704
tasks: The file scan tasks to process.
17051705
batch_fn: A callable that takes a FileScanTask and returns an iterator of RecordBatches.
1706-
concurrent_files: Maximum number of files to read concurrently.
1706+
concurrent_streams: Maximum number of concurrent read streams.
17071707
max_buffered_batches: Maximum number of batches to buffer in the queue.
17081708
"""
17091709
if not tasks:
@@ -1730,7 +1730,7 @@ def worker(task: FileScanTask) -> None:
17301730
if remaining == 0:
17311731
batch_queue.put(_QUEUE_SENTINEL)
17321732

1733-
with ThreadPoolExecutor(max_workers=concurrent_files) as executor:
1733+
with ThreadPoolExecutor(max_workers=concurrent_streams) as executor:
17341734
for task in tasks:
17351735
executor.submit(worker, task)
17361736

@@ -1838,8 +1838,7 @@ def to_record_batches(
18381838
self,
18391839
tasks: Iterable[FileScanTask],
18401840
batch_size: int | None = None,
1841-
order: ScanOrder = ScanOrder.TASK,
1842-
concurrent_files: int = 1,
1841+
order: ScanOrder = TaskOrder(),
18431842
) -> Iterator[pa.RecordBatch]:
18441843
"""Scan the Iceberg table and return an Iterator[pa.RecordBatch].
18451844
@@ -1848,21 +1847,17 @@ def to_record_batches(
18481847
Only data that matches the provided row_filter expression is returned.
18491848
18501849
Ordering semantics:
1851-
- ScanOrder.TASK (default): Batches are grouped by file in task submission order.
1852-
- ScanOrder.ARRIVAL: Batches may be interleaved across files. Within each file,
1853-
batch ordering follows row order.
1850+
- TaskOrder() (default): Yields batches one file at a time in task submission order.
1851+
- ArrivalOrder(): Batches may be interleaved across files as they arrive.
1852+
Within each file, batch ordering follows row order.
18541853
18551854
Args:
18561855
tasks: FileScanTasks representing the data files and delete files to read from.
18571856
batch_size: The number of rows per batch. If None, PyArrow's default is used.
18581857
order: Controls the order in which record batches are returned.
1859-
ScanOrder.TASK (default) returns batches in task order, with each task
1860-
fully materialized before proceeding to the next. Allows parallel file
1861-
reads via executor. ScanOrder.ARRIVAL yields batches as they are
1862-
produced without materializing entire files into memory.
1863-
concurrent_files: Number of files to read concurrently when order=ScanOrder.ARRIVAL.
1864-
Must be >= 1. When > 1, batches may arrive interleaved across files.
1865-
Ignored when order=ScanOrder.TASK.
1858+
TaskOrder() (default) yields batches one file at a time in task order.
1859+
ArrivalOrder(concurrent_streams=N, max_buffered_batches=M) yields batches
1860+
as they are produced without materializing entire files into memory.
18661861
18671862
Returns:
18681863
An Iterator of PyArrow RecordBatches.
@@ -1871,18 +1866,17 @@ def to_record_batches(
18711866
Raises:
18721867
ResolveError: When a required field cannot be found in the file
18731868
ValueError: When a field type in the file cannot be projected to the schema type,
1874-
or when an invalid order value is provided, or when concurrent_files < 1.
1869+
or when an invalid order value is provided, or when concurrent_streams < 1.
18751870
"""
18761871
if not isinstance(order, ScanOrder):
1877-
raise ValueError(f"Invalid order: {order!r}. Must be a ScanOrder enum value (ScanOrder.TASK or ScanOrder.ARRIVAL).")
1878-
1879-
if concurrent_files < 1:
1880-
raise ValueError(f"concurrent_files must be >= 1, got {concurrent_files}")
1872+
raise ValueError(f"Invalid order: {order!r}. Must be a ScanOrder instance (TaskOrder() or ArrivalOrder()).")
18811873

18821874
task_list, deletes_per_file = self._prepare_tasks_and_deletes(tasks)
18831875

1884-
if order == ScanOrder.ARRIVAL:
1885-
return self._apply_limit(self._iter_batches_arrival(task_list, deletes_per_file, batch_size, concurrent_files))
1876+
if isinstance(order, ArrivalOrder):
1877+
if order.concurrent_streams < 1:
1878+
raise ValueError(f"concurrent_streams must be >= 1, got {order.concurrent_streams}")
1879+
return self._apply_limit(self._iter_batches_arrival(task_list, deletes_per_file, batch_size, order.concurrent_streams, order.max_buffered_batches))
18861880

18871881
return self._apply_limit(self._iter_batches_materialized(task_list, deletes_per_file, batch_size))
18881882

@@ -1899,14 +1893,15 @@ def _iter_batches_arrival(
18991893
task_list: list[FileScanTask],
19001894
deletes_per_file: dict[str, list[ChunkedArray]],
19011895
batch_size: int | None,
1902-
concurrent_files: int,
1896+
concurrent_streams: int,
1897+
max_buffered_batches: int = 16,
19031898
) -> Iterator[pa.RecordBatch]:
19041899
"""Yield batches using bounded concurrent streaming in arrival order."""
19051900

19061901
def batch_fn(task: FileScanTask) -> Iterator[pa.RecordBatch]:
19071902
return self._record_batches_from_scan_tasks_and_deletes([task], deletes_per_file, batch_size)
19081903

1909-
yield from _bounded_concurrent_batches(task_list, batch_fn, concurrent_files)
1904+
yield from _bounded_concurrent_batches(task_list, batch_fn, concurrent_streams, max_buffered_batches)
19101905

19111906
def _iter_batches_materialized(
19121907
self,

pyiceberg/table/__init__.py

Lines changed: 32 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -155,18 +155,31 @@
155155
DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE = "downcast-ns-timestamp-to-us-on-write"
156156

157157

158-
class ScanOrder(str, Enum):
159-
"""Order in which record batches are returned from a scan.
160-
161-
Attributes:
162-
TASK: Batches are returned in task order, with each task fully materialized
163-
before proceeding to the next. Allows parallel file reads via executor.
164-
ARRIVAL: Batches are yielded as they are produced, processing tasks
165-
sequentially without materializing entire files into memory.
158+
@dataclass
159+
class ScanOrder(ABC):
160+
"""Base class for scan ordering strategies."""
161+
pass
162+
163+
164+
@dataclass
165+
class TaskOrder(ScanOrder):
166+
"""Sequential task processing preserving existing behavior.
167+
168+
Batches are returned in task order, with each task fully materialized
169+
before proceeding to the next. Allows parallel file reads via executor.
166170
"""
171+
pass
172+
167173

168-
TASK = "task"
169-
ARRIVAL = "arrival"
174+
@dataclass
175+
class ArrivalOrder(ScanOrder):
176+
"""Stream batches as they arrive from concurrent read streams.
177+
178+
Batches are yielded as they are produced without materializing entire
179+
files into memory. Supports concurrent processing of multiple files.
180+
"""
181+
concurrent_streams: int = 1
182+
max_buffered_batches: int = 16
170183

171184

172185
@dataclass()
@@ -2171,7 +2184,7 @@ def to_arrow(self) -> pa.Table:
21712184
).to_table(self.plan_files())
21722185

21732186
def to_arrow_batch_reader(
2174-
self, batch_size: int | None = None, order: ScanOrder = ScanOrder.TASK, concurrent_files: int = 1
2187+
self, batch_size: int | None = None, order: ScanOrder = TaskOrder()
21752188
) -> pa.RecordBatchReader:
21762189
"""Return an Arrow RecordBatchReader from this DataScan.
21772190
@@ -2180,18 +2193,17 @@ def to_arrow_batch_reader(
21802193
is read one at a time.
21812194
21822195
Ordering semantics:
2183-
- ScanOrder.TASK (default): Batches are grouped by file in task submission order.
2184-
- ScanOrder.ARRIVAL: Batches may be interleaved across files. Within each file,
2185-
batch ordering follows row order.
2196+
- TaskOrder() (default): Yields batches one file at a time in file scan task order.
2197+
- ArrivalOrder(): Batches may be interleaved across files as they arrive.
2198+
Within each file, batch ordering follows row order.
21862199
21872200
Args:
21882201
batch_size: The number of rows per batch. If None, PyArrow's default is used.
21892202
order: Controls the order in which record batches are returned.
2190-
ScanOrder.TASK (default) returns batches in task order with parallel
2191-
file reads. ScanOrder.ARRIVAL yields batches as they are produced
2192-
without materializing entire files into memory.
2193-
concurrent_files: Number of files to read concurrently when order=ScanOrder.ARRIVAL.
2194-
When > 1, batches may arrive interleaved across files.
2203+
TaskOrder() (default) yields batches one file at a time in task order.
2204+
ArrivalOrder(concurrent_streams=N, max_buffered_batches=M) yields batches
2205+
as they are produced without materializing entire files into memory.
2206+
concurrent_streams controls parallelism, max_buffered_batches controls memory.
21952207
21962208
Returns:
21972209
pa.RecordBatchReader: Arrow RecordBatchReader from the Iceberg table's DataScan
@@ -2204,7 +2216,7 @@ def to_arrow_batch_reader(
22042216
target_schema = schema_to_pyarrow(self.projection())
22052217
batches = ArrowScan(
22062218
self.table_metadata, self.io, self.projection(), self.row_filter, self.case_sensitive, self.limit
2207-
).to_record_batches(self.plan_files(), batch_size=batch_size, order=order, concurrent_files=concurrent_files)
2219+
).to_record_batches(self.plan_files(), batch_size=batch_size, order=order)
22082220

22092221
return pa.RecordBatchReader.from_batches(
22102222
target_schema,

0 commit comments

Comments
 (0)