6969 FileInfo ,
7070 FileSystem ,
7171 FileType ,
72- FSSpecHandler ,
7372)
7473from sortedcontainers import SortedList
7574
117116 InputStream ,
118117 OutputFile ,
119118 OutputStream ,
120- _parse_location ,
121119)
122120from pyiceberg .manifest import (
123121 DataFile ,
@@ -309,9 +307,7 @@ def open(self, seekable: bool = True) -> InputStream:
309307 input_file = self ._filesystem .open_input_file (self ._path )
310308 else :
311309 input_file = self ._filesystem .open_input_stream (self ._path , buffer_size = self ._buffer_size )
312- except FileNotFoundError :
313- raise
314- except PermissionError :
310+ except (FileNotFoundError , PermissionError ):
315311 raise
316312 except OSError as e :
317313 if e .errno == 2 or "Path does not exist" in str (e ):
@@ -916,27 +912,20 @@ def _get_file_format(file_format: FileFormat, **kwargs: Dict[str, Any]) -> ds.Fi
916912 raise ValueError (f"Unsupported file format: { file_format } " )
917913
918914
919- def _construct_fragment (fs : FileSystem , data_file : DataFile , file_format_kwargs : Dict [str , Any ] = EMPTY_DICT ) -> ds .Fragment :
920- _ , _ , path = PyArrowFileIO .parse_location (data_file .file_path )
921- return _get_file_format (data_file .file_format , ** file_format_kwargs ).make_fragment (path , fs )
922-
923-
924- def _read_deletes (fs : FileSystem , data_file : DataFile ) -> Dict [str , pa .ChunkedArray ]:
915+ def _read_deletes (io : FileIO , data_file : DataFile ) -> Dict [str , pa .ChunkedArray ]:
925916 if data_file .file_format == FileFormat .PARQUET :
926- delete_fragment = _construct_fragment (
927- fs ,
928- data_file ,
929- file_format_kwargs = {"dictionary_columns" : ("file_path" ,), "pre_buffer" : True , "buffer_size" : ONE_MEGABYTE },
930- )
931- table = ds .Scanner .from_fragment (fragment = delete_fragment ).to_table ()
917+ with io .new_input (data_file .file_path ).open () as fi :
918+ delete_fragment = _get_file_format (
919+ data_file .file_format , dictionary_columns = ("file_path" ,), pre_buffer = True , buffer_size = ONE_MEGABYTE
920+ ).make_fragment (fi )
921+ table = ds .Scanner .from_fragment (fragment = delete_fragment ).to_table ()
932922 table = table .unify_dictionaries ()
933923 return {
934924 file .as_py (): table .filter (pc .field ("file_path" ) == file ).column ("pos" )
935925 for file in table .column ("file_path" ).chunks [0 ].dictionary
936926 }
937927 elif data_file .file_format == FileFormat .PUFFIN :
938- _ , _ , path = PyArrowFileIO .parse_location (data_file .file_path )
939- with fs .open_input_file (path ) as fi :
928+ with io .new_input (data_file .file_path ).open () as fi :
940929 payload = fi .read ()
941930
942931 return PuffinFile (payload ).to_vector ()
@@ -1383,7 +1372,7 @@ def _get_column_projection_values(
13831372
13841373
13851374def _task_to_record_batches (
1386- fs : FileSystem ,
1375+ io : FileIO ,
13871376 task : FileScanTask ,
13881377 bound_row_filter : BooleanExpression ,
13891378 projected_schema : Schema ,
@@ -1393,9 +1382,8 @@ def _task_to_record_batches(
13931382 name_mapping : Optional [NameMapping ] = None ,
13941383 partition_spec : Optional [PartitionSpec ] = None ,
13951384) -> Iterator [pa .RecordBatch ]:
1396- _ , _ , path = _parse_location (task .file .file_path )
13971385 arrow_format = ds .ParquetFileFormat (pre_buffer = True , buffer_size = (ONE_MEGABYTE * 8 ))
1398- with fs . open_input_file ( path ) as fin :
1386+ with io . new_input ( task . file . file_path ). open ( ) as fin :
13991387 fragment = arrow_format .make_fragment (fin )
14001388 physical_schema = fragment .physical_schema
14011389 # In V1 and V2 table formats, we only support Timestamp 'us' in Iceberg Schema
@@ -1479,7 +1467,7 @@ def _read_all_delete_files(io: FileIO, tasks: Iterable[FileScanTask]) -> Dict[st
14791467 executor = ExecutorFactory .get_or_create ()
14801468 deletes_per_files : Iterator [Dict [str , ChunkedArray ]] = executor .map (
14811469 lambda args : _read_deletes (* args ),
1482- [(_fs_from_file_path ( io , delete_file . file_path ) , delete_file ) for delete_file in unique_deletes ],
1470+ [(io , delete_file ) for delete_file in unique_deletes ],
14831471 )
14841472 for delete in deletes_per_files :
14851473 for file , arr in delete .items ():
@@ -1491,25 +1479,6 @@ def _read_all_delete_files(io: FileIO, tasks: Iterable[FileScanTask]) -> Dict[st
14911479 return deletes_per_file
14921480
14931481
1494- def _fs_from_file_path (io : FileIO , file_path : str ) -> FileSystem :
1495- scheme , netloc , _ = _parse_location (file_path )
1496- if isinstance (io , PyArrowFileIO ):
1497- return io .fs_by_scheme (scheme , netloc )
1498- else :
1499- try :
1500- from pyiceberg .io .fsspec import FsspecFileIO
1501-
1502- if isinstance (io , FsspecFileIO ):
1503- from pyarrow .fs import PyFileSystem
1504-
1505- return PyFileSystem (FSSpecHandler (io .get_fs (scheme )))
1506- else :
1507- raise ValueError (f"Expected PyArrowFileIO or FsspecFileIO, got: { io } " )
1508- except ModuleNotFoundError as e :
1509- # When FsSpec is not installed
1510- raise ValueError (f"Expected PyArrowFileIO or FsspecFileIO, got: { io } " ) from e
1511-
1512-
15131482class ArrowScan :
15141483 _table_metadata : TableMetadata
15151484 _io : FileIO
@@ -1654,7 +1623,7 @@ def _record_batches_from_scan_tasks_and_deletes(
16541623 if self ._limit is not None and total_row_count >= self ._limit :
16551624 break
16561625 batches = _task_to_record_batches (
1657- _fs_from_file_path ( self ._io , task . file . file_path ) ,
1626+ self ._io ,
16581627 task ,
16591628 self ._bound_row_filter ,
16601629 self ._projected_schema ,
0 commit comments