Skip to content

Commit f8acdb0

Browse files
committed
Use ExecutorFactory
1 parent 88a4ad2 commit f8acdb0

2 files changed

Lines changed: 5 additions & 3 deletions

File tree

pyiceberg/io/pyarrow.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1646,12 +1646,10 @@ def to_record_batches(
16461646
ResolveError: When a required field cannot be found in the file
16471647
ValueError: When a field type in the file cannot be projected to the schema type
16481648
"""
1649-
from concurrent.futures import ThreadPoolExecutor
1650-
16511649
deletes_per_file = _read_all_delete_files(self._io, tasks)
16521650

16531651
if concurrent_tasks is not None:
1654-
with ThreadPoolExecutor(max_workers=concurrent_tasks) as pool:
1652+
with ExecutorFactory.create(max_workers=concurrent_tasks) as pool:
16551653
for batches in pool.map(
16561654
lambda task: list(self._record_batches_from_scan_tasks_and_deletes([task], deletes_per_file)), tasks
16571655
):

pyiceberg/utils/concurrent.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,3 +38,7 @@ def get_or_create() -> Executor:
3838
def max_workers() -> Optional[int]:
3939
"""Return the max number of workers configured."""
4040
return Config().get_int("max-workers")
41+
42+
@staticmethod
43+
def create(max_workers: int) -> Executor:
44+
return ThreadPoolExecutor(max_workers=max_workers)

0 commit comments

Comments
 (0)