2020from typing import TYPE_CHECKING , Any , Dict , Iterator , List , Optional , Set , Tuple
2121
2222from pyiceberg .conversions import from_bytes
23- from pyiceberg .manifest import DataFile , DataFileContent , ManifestContent , ManifestFile , PartitionFieldSummary
23+ from pyiceberg .manifest import DataFile , DataFileContent , ManifestContent , PartitionFieldSummary
2424from pyiceberg .partitioning import PartitionSpec
2525from pyiceberg .table .snapshots import Snapshot , ancestors_of
2626from pyiceberg .types import PrimitiveType
@@ -523,73 +523,7 @@ def history(self) -> "pa.Table":
523523
524524 return pa .Table .from_pylist (history , schema = history_schema )
525525
526- def _get_files_from_manifest (
527- self , manifest_list : ManifestFile , data_file_filter : Optional [Set [DataFileContent ]] = None
528- ) -> "pa.Table" :
529- import pyarrow as pa
530-
531- files : list [dict [str , Any ]] = []
532- schema = self .tbl .metadata .schema ()
533- io = self .tbl .io
534-
535- for manifest_entry in manifest_list .fetch_manifest_entry (io ):
536- data_file = manifest_entry .data_file
537- if data_file_filter and data_file .content not in data_file_filter :
538- continue
539- column_sizes = data_file .column_sizes or {}
540- value_counts = data_file .value_counts or {}
541- null_value_counts = data_file .null_value_counts or {}
542- nan_value_counts = data_file .nan_value_counts or {}
543- lower_bounds = data_file .lower_bounds or {}
544- upper_bounds = data_file .upper_bounds or {}
545- readable_metrics = {
546- schema .find_column_name (field .field_id ): {
547- "column_size" : column_sizes .get (field .field_id ),
548- "value_count" : value_counts .get (field .field_id ),
549- "null_value_count" : null_value_counts .get (field .field_id ),
550- "nan_value_count" : nan_value_counts .get (field .field_id ),
551- "lower_bound" : from_bytes (field .field_type , lower_bound )
552- if (lower_bound := lower_bounds .get (field .field_id ))
553- else None ,
554- "upper_bound" : from_bytes (field .field_type , upper_bound )
555- if (upper_bound := upper_bounds .get (field .field_id ))
556- else None ,
557- }
558- for field in self .tbl .metadata .schema ().fields
559- }
560- partition = data_file .partition
561- partition_record_dict = {
562- field .name : partition [pos ]
563- for pos , field in enumerate (self .tbl .metadata .specs ()[manifest_list .partition_spec_id ].fields )
564- }
565- files .append (
566- {
567- "content" : data_file .content ,
568- "file_path" : data_file .file_path ,
569- "file_format" : data_file .file_format ,
570- "spec_id" : data_file .spec_id ,
571- "partition" : partition_record_dict ,
572- "record_count" : data_file .record_count ,
573- "file_size_in_bytes" : data_file .file_size_in_bytes ,
574- "column_sizes" : dict (data_file .column_sizes ) if data_file .column_sizes is not None else None ,
575- "value_counts" : dict (data_file .value_counts ) if data_file .value_counts is not None else None ,
576- "null_value_counts" : dict (data_file .null_value_counts ) if data_file .null_value_counts is not None else None ,
577- "nan_value_counts" : dict (data_file .nan_value_counts ) if data_file .nan_value_counts is not None else None ,
578- "lower_bounds" : dict (data_file .lower_bounds ) if data_file .lower_bounds is not None else None ,
579- "upper_bounds" : dict (data_file .upper_bounds ) if data_file .upper_bounds is not None else None ,
580- "key_metadata" : data_file .key_metadata ,
581- "split_offsets" : data_file .split_offsets ,
582- "equality_ids" : data_file .equality_ids ,
583- "sort_order_id" : data_file .sort_order_id ,
584- "readable_metrics" : readable_metrics ,
585- }
586- )
587- return pa .Table .from_pylist (
588- files ,
589- schema = self ._get_files_schema (),
590- )
591-
592- def _get_files_schema (self ) -> "pa.Schema" :
526+ def _files (self , snapshot_id : Optional [int ] = None , data_file_filter : Optional [Set [DataFileContent ]] = None ) -> "pa.Table" :
593527 import pyarrow as pa
594528
595529 from pyiceberg .io .pyarrow import schema_to_pyarrow
@@ -610,9 +544,6 @@ def _readable_metrics_struct(bound_type: PrimitiveType) -> pa.StructType:
610544 ]
611545 )
612546
613- partition_record = self .tbl .metadata .specs_struct ()
614- pa_record_struct = schema_to_pyarrow (partition_record )
615-
616547 for field in self .tbl .metadata .schema ().fields :
617548 readable_metrics_struct .append (
618549 pa .field (schema .find_column_name (field .field_id ), _readable_metrics_struct (field .field_type ), nullable = False )
@@ -624,7 +555,6 @@ def _readable_metrics_struct(bound_type: PrimitiveType) -> pa.StructType:
624555 pa .field ("file_path" , pa .string (), nullable = False ),
625556 pa .field ("file_format" , pa .dictionary (pa .int32 (), pa .string ()), nullable = False ),
626557 pa .field ("spec_id" , pa .int32 (), nullable = False ),
627- pa .field ("partition" , pa_record_struct , nullable = False ),
628558 pa .field ("record_count" , pa .int64 (), nullable = False ),
629559 pa .field ("file_size_in_bytes" , pa .int64 (), nullable = False ),
630560 pa .field ("column_sizes" , pa .map_ (pa .int32 (), pa .int64 ()), nullable = True ),
@@ -640,21 +570,71 @@ def _readable_metrics_struct(bound_type: PrimitiveType) -> pa.StructType:
640570 pa .field ("readable_metrics" , pa .struct (readable_metrics_struct ), nullable = True ),
641571 ]
642572 )
643- return files_schema
644573
645- def _files (self , snapshot_id : Optional [int ] = None , data_file_filter : Optional [Set [DataFileContent ]] = None ) -> "pa.Table" :
646- import pyarrow as pa
574+ files : list [dict [str , Any ]] = []
647575
648576 if not snapshot_id and not self .tbl .metadata .current_snapshot ():
649- return self ._get_files_schema ().empty_table ()
650-
577+ return pa .Table .from_pylist (
578+ files ,
579+ schema = files_schema ,
580+ )
651581 snapshot = self ._get_snapshot (snapshot_id )
582+
652583 io = self .tbl .io
653- files_table : list [pa .Table ] = []
654584 for manifest_list in snapshot .manifests (io ):
655- files_table .append (self ._get_files_from_manifest (manifest_list , data_file_filter ))
585+ for manifest_entry in manifest_list .fetch_manifest_entry (io ):
586+ data_file = manifest_entry .data_file
587+ if data_file_filter and data_file .content not in data_file_filter :
588+ continue
589+ column_sizes = data_file .column_sizes or {}
590+ value_counts = data_file .value_counts or {}
591+ null_value_counts = data_file .null_value_counts or {}
592+ nan_value_counts = data_file .nan_value_counts or {}
593+ lower_bounds = data_file .lower_bounds or {}
594+ upper_bounds = data_file .upper_bounds or {}
595+ readable_metrics = {
596+ schema .find_column_name (field .field_id ): {
597+ "column_size" : column_sizes .get (field .field_id ),
598+ "value_count" : value_counts .get (field .field_id ),
599+ "null_value_count" : null_value_counts .get (field .field_id ),
600+ "nan_value_count" : nan_value_counts .get (field .field_id ),
601+ "lower_bound" : from_bytes (field .field_type , lower_bound )
602+ if (lower_bound := lower_bounds .get (field .field_id ))
603+ else None ,
604+ "upper_bound" : from_bytes (field .field_type , upper_bound )
605+ if (upper_bound := upper_bounds .get (field .field_id ))
606+ else None ,
607+ }
608+ for field in self .tbl .metadata .schema ().fields
609+ }
610+ files .append (
611+ {
612+ "content" : data_file .content ,
613+ "file_path" : data_file .file_path ,
614+ "file_format" : data_file .file_format ,
615+ "spec_id" : data_file .spec_id ,
616+ "record_count" : data_file .record_count ,
617+ "file_size_in_bytes" : data_file .file_size_in_bytes ,
618+ "column_sizes" : dict (data_file .column_sizes ) if data_file .column_sizes is not None else None ,
619+ "value_counts" : dict (data_file .value_counts ) if data_file .value_counts is not None else None ,
620+ "null_value_counts" : dict (data_file .null_value_counts )
621+ if data_file .null_value_counts is not None
622+ else None ,
623+ "nan_value_counts" : dict (data_file .nan_value_counts ) if data_file .nan_value_counts is not None else None ,
624+ "lower_bounds" : dict (data_file .lower_bounds ) if data_file .lower_bounds is not None else None ,
625+ "upper_bounds" : dict (data_file .upper_bounds ) if data_file .upper_bounds is not None else None ,
626+ "key_metadata" : data_file .key_metadata ,
627+ "split_offsets" : data_file .split_offsets ,
628+ "equality_ids" : data_file .equality_ids ,
629+ "sort_order_id" : data_file .sort_order_id ,
630+ "readable_metrics" : readable_metrics ,
631+ }
632+ )
656633
657- return pa .concat_tables (files_table )
634+ return pa .Table .from_pylist (
635+ files ,
636+ schema = files_schema ,
637+ )
658638
659639 def files (self , snapshot_id : Optional [int ] = None ) -> "pa.Table" :
660640 return self ._files (snapshot_id )
@@ -665,12 +645,10 @@ def data_files(self, snapshot_id: Optional[int] = None) -> "pa.Table":
665645 def delete_files (self , snapshot_id : Optional [int ] = None ) -> "pa.Table" :
666646 return self ._files (snapshot_id , {DataFileContent .POSITION_DELETES , DataFileContent .EQUALITY_DELETES })
667647
668- def all_manifests (self , snapshots : Optional [ list [ Snapshot ]] = None ) -> "pa.Table" :
648+ def all_manifests (self ) -> "pa.Table" :
669649 import pyarrow as pa
670650
671- if snapshots is None :
672- snapshots = self .tbl .snapshots ()
673-
651+ snapshots = self .tbl .snapshots ()
674652 if not snapshots :
675653 return pa .Table .from_pylist ([], schema = self ._get_all_manifests_schema ())
676654
@@ -679,30 +657,3 @@ def all_manifests(self, snapshots: Optional[list[Snapshot]] = None) -> "pa.Table
679657 lambda args : self ._generate_manifests_table (* args ), [(snapshot , True ) for snapshot in snapshots ]
680658 )
681659 return pa .concat_tables (manifests_by_snapshots )
682-
683- def _all_files (self , data_file_filter : Optional [Set [DataFileContent ]] = None ) -> "pa.Table" :
684- import pyarrow as pa
685-
686- snapshots = self .tbl .snapshots ()
687- if not snapshots :
688- return pa .Table .from_pylist ([], schema = self ._get_files_schema ())
689-
690- executor = ExecutorFactory .get_or_create ()
691- manifest_lists = executor .map (lambda snapshot : snapshot .manifests (self .tbl .io ), snapshots )
692-
693- unique_manifests = {(manifest .manifest_path , manifest ) for manifest_list in manifest_lists for manifest in manifest_list }
694-
695- file_lists = executor .map (
696- lambda args : self ._get_files_from_manifest (* args ), [(manifest , data_file_filter ) for _ , manifest in unique_manifests ]
697- )
698-
699- return pa .concat_tables (file_lists )
700-
701- def all_files (self ) -> "pa.Table" :
702- return self ._all_files ()
703-
704- def all_data_files (self ) -> "pa.Table" :
705- return self ._all_files ({DataFileContent .DATA })
706-
707- def all_delete_files (self ) -> "pa.Table" :
708- return self ._all_files ({DataFileContent .POSITION_DELETES , DataFileContent .EQUALITY_DELETES })
0 commit comments