Skip to content

Commit 2d34662

Browse files
author
Roman Shanin
committed
Fix projected fields predicate evaluation
1 parent c467957 commit 2d34662

2 files changed

Lines changed: 15 additions & 9 deletions

File tree

pyiceberg/expressions/visitors.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -870,9 +870,10 @@ class _ColumnNameTranslator(BooleanExpressionVisitor[BooleanExpression]):
870870
file_schema: Schema
871871
case_sensitive: bool
872872

873-
def __init__(self, file_schema: Schema, case_sensitive: bool) -> None:
873+
def __init__(self, file_schema: Schema, case_sensitive: bool, projected_missing_fields: dict[str, Any]) -> None:
874874
self.file_schema = file_schema
875875
self.case_sensitive = case_sensitive
876+
self.projected_missing_fields = projected_missing_fields
876877

877878
def visit_true(self) -> BooleanExpression:
878879
return AlwaysTrue()
@@ -894,12 +895,17 @@ def visit_unbound_predicate(self, predicate: UnboundPredicate[L]) -> BooleanExpr
894895

895896
def visit_bound_predicate(self, predicate: BoundPredicate[L]) -> BooleanExpression:
896897
file_column_name = self.file_schema.find_column_name(predicate.term.ref().field.field_id)
898+
field_name = predicate.term.ref().field.name
897899

898900
if file_column_name is None:
899901
# In the case of schema evolution, the column might not be present
900902
# in the file schema when reading older data
901903
if isinstance(predicate, BoundIsNull):
902904
return AlwaysTrue()
905+
# Projected fields are only available for identity partition fields
906+
# Which mean that partition pruning excluded partition field which evaluates to false
907+
elif field_name in self.projected_missing_fields:
908+
return AlwaysTrue()
903909
else:
904910
return AlwaysFalse()
905911

@@ -913,8 +919,8 @@ def visit_bound_predicate(self, predicate: BoundPredicate[L]) -> BooleanExpressi
913919
raise ValueError(f"Unsupported predicate: {predicate}")
914920

915921

916-
def translate_column_names(expr: BooleanExpression, file_schema: Schema, case_sensitive: bool) -> BooleanExpression:
917-
return visit(expr, _ColumnNameTranslator(file_schema, case_sensitive))
922+
def translate_column_names(expr: BooleanExpression, file_schema: Schema, case_sensitive: bool, projected_missing_fields: dict[str, Any]) -> BooleanExpression:
923+
return visit(expr, _ColumnNameTranslator(file_schema, case_sensitive, projected_missing_fields))
918924

919925

920926
class _ExpressionFieldIDs(BooleanExpressionVisitor[Set[int]]):

pyiceberg/io/pyarrow.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1404,18 +1404,18 @@ def _task_to_record_batches(
14041404
# the table format version.
14051405
file_schema = pyarrow_to_schema(physical_schema, name_mapping, downcast_ns_timestamp_to_us=True)
14061406

1407-
pyarrow_filter = None
1408-
if bound_row_filter is not AlwaysTrue():
1409-
translated_row_filter = translate_column_names(bound_row_filter, file_schema, case_sensitive=case_sensitive)
1410-
bound_file_filter = bind(file_schema, translated_row_filter, case_sensitive=case_sensitive)
1411-
pyarrow_filter = expression_to_pyarrow(bound_file_filter)
1412-
14131407
# Apply column projection rules
14141408
# https://iceberg.apache.org/spec/#column-projection
14151409
should_project_columns, projected_missing_fields = _get_column_projection_values(
14161410
task.file, projected_schema, partition_spec, file_schema.field_ids
14171411
)
14181412

1413+
pyarrow_filter = None
1414+
if bound_row_filter is not AlwaysTrue():
1415+
translated_row_filter = translate_column_names(bound_row_filter, file_schema, case_sensitive=case_sensitive, projected_missing_fields=projected_missing_fields)
1416+
bound_file_filter = bind(file_schema, translated_row_filter, case_sensitive=case_sensitive)
1417+
pyarrow_filter = expression_to_pyarrow(bound_file_filter)
1418+
14191419
file_project_schema = prune_columns(file_schema, projected_field_ids, select_full_types=False)
14201420

14211421
fragment_scanner = ds.Scanner.from_fragment(

0 commit comments

Comments
 (0)