Skip to content

Commit 6b8dace

Browse files
committed
Improve comments in to_record_batches
1 parent 445845d commit 6b8dace

1 file changed

Lines changed: 8 additions & 4 deletions

File tree

pyiceberg/io/pyarrow.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1611,23 +1611,27 @@ def to_record_batches(self, tasks: Iterable[FileScanTask]) -> Iterator[pa.Record
16111611
total_row_count = 0
16121612
executor = ExecutorFactory.get_or_create()
16131613

1614+
def batches_for_task(task: FileScanTask) -> List[pa.RecordBatch]:
1615+
# Materialize the iterator here to ensure execution happens within the executor.
1616+
# Otherwise, the iterator would be lazily consumed later (in the main thread),
1617+
# defeating the purpose of using executor.map.
1618+
return list(self._record_batches_from_scan_tasks_and_deletes([task], deletes_per_file))
1619+
16141620
limit_reached = False
1615-
for batches in executor.map(
1616-
lambda task: list(self._record_batches_from_scan_tasks_and_deletes([task], deletes_per_file)), tasks
1617-
):
1621+
for batches in executor.map(batches_for_task, tasks):
16181622
for batch in batches:
16191623
current_batch_size = len(batch)
16201624
if self._limit is not None and total_row_count + current_batch_size >= self._limit:
16211625
yield batch.slice(0, self._limit - total_row_count)
16221626

1623-
# This break will also cancel all running tasks
16241627
limit_reached = True
16251628
break
16261629

16271630
yield batch
16281631
total_row_count += current_batch_size
16291632

16301633
if limit_reached:
1634+
# This break will also cancel all running tasks in the executor
16311635
break
16321636

16331637
def _record_batches_from_scan_tasks_and_deletes(

0 commit comments

Comments
 (0)