Skip to content

Commit a8dbf6b

Browse files
author
Roman Shanin
committed
extend signatue of translate_column_names
1 parent 5e975d5 commit a8dbf6b

2 files changed

Lines changed: 27 additions & 9 deletions

File tree

pyiceberg/expressions/visitors.py

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -860,7 +860,9 @@ class _ColumnNameTranslator(BooleanExpressionVisitor[BooleanExpression]):
860860
861861
Args:
862862
file_schema (Schema): The schema of the file.
863+
projected_schema (Schema): The schema to project onto the data files.
863864
case_sensitive (bool): Whether to consider case when binding a reference to a field in a schema, defaults to True.
865+
projected_missing_fields(dict[str, Any]): Map of fields missing in file_schema, but present as partition values.
864866
865867
Raises:
866868
TypeError: In the case of an UnboundPredicate.
@@ -870,9 +872,13 @@ class _ColumnNameTranslator(BooleanExpressionVisitor[BooleanExpression]):
870872
file_schema: Schema
871873
case_sensitive: bool
872874

873-
def __init__(self, file_schema: Schema, case_sensitive: bool) -> None:
875+
def __init__(
876+
self, file_schema: Schema, projected_schema: Schema, case_sensitive: bool, projected_missing_fields: dict[str, Any]
877+
) -> None:
874878
self.file_schema = file_schema
879+
self.projected_schema = projected_schema
875880
self.case_sensitive = case_sensitive
881+
self.projected_missing_fields = projected_missing_fields
876882

877883
def visit_true(self) -> BooleanExpression:
878884
return AlwaysTrue()
@@ -913,8 +919,14 @@ def visit_bound_predicate(self, predicate: BoundPredicate[L]) -> BooleanExpressi
913919
raise ValueError(f"Unsupported predicate: {predicate}")
914920

915921

916-
def translate_column_names(expr: BooleanExpression, file_schema: Schema, case_sensitive: bool) -> BooleanExpression:
917-
return visit(expr, _ColumnNameTranslator(file_schema, case_sensitive))
922+
def translate_column_names(
923+
expr: BooleanExpression,
924+
file_schema: Schema,
925+
projected_schema: Schema,
926+
case_sensitive: bool,
927+
projected_missing_fields: dict[str, Any],
928+
) -> BooleanExpression:
929+
return visit(expr, _ColumnNameTranslator(file_schema, projected_schema, case_sensitive, projected_missing_fields))
918930

919931

920932
class _ExpressionFieldIDs(BooleanExpressionVisitor[Set[int]]):

pyiceberg/io/pyarrow.py

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1458,18 +1458,24 @@ def _task_to_record_batches(
14581458
# the table format version.
14591459
file_schema = pyarrow_to_schema(physical_schema, name_mapping, downcast_ns_timestamp_to_us=True)
14601460

1461-
pyarrow_filter = None
1462-
if bound_row_filter is not AlwaysTrue():
1463-
translated_row_filter = translate_column_names(bound_row_filter, file_schema, case_sensitive=case_sensitive)
1464-
bound_file_filter = bind(file_schema, translated_row_filter, case_sensitive=case_sensitive)
1465-
pyarrow_filter = expression_to_pyarrow(bound_file_filter)
1466-
14671461
# Apply column projection rules
14681462
# https://iceberg.apache.org/spec/#column-projection
14691463
should_project_columns, projected_missing_fields = _get_column_projection_values(
14701464
task.file, projected_schema, partition_spec, file_schema.field_ids
14711465
)
14721466

1467+
pyarrow_filter = None
1468+
if bound_row_filter is not AlwaysTrue():
1469+
translated_row_filter = translate_column_names(
1470+
bound_row_filter,
1471+
file_schema,
1472+
projected_schema,
1473+
case_sensitive=case_sensitive,
1474+
projected_missing_fields=projected_missing_fields,
1475+
)
1476+
bound_file_filter = bind(file_schema, translated_row_filter, case_sensitive=case_sensitive)
1477+
pyarrow_filter = expression_to_pyarrow(bound_file_filter)
1478+
14731479
file_project_schema = prune_columns(file_schema, projected_field_ids, select_full_types=False)
14741480

14751481
fragment_scanner = ds.Scanner.from_fragment(

0 commit comments

Comments
 (0)