Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 9 additions & 11 deletions python/ray/data/_internal/datasource/iceberg_datasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@


class _IcebergExpressionVisitor(
_ExprVisitor["BooleanExpression | UnboundTerm[Any] | Literal[Any]"]
_ExprVisitor["BooleanExpression | UnboundTerm | Literal"]
):
"""
Visitor that converts Ray Data expressions to PyIceberg expressions.
Expand All @@ -99,11 +99,11 @@ class _IcebergExpressionVisitor(
>>> # iceberg_expr can now be used with PyIceberg's filter APIs
"""

def visit_column(self, expr: "ColumnExpr") -> "UnboundTerm[Any]":
def visit_column(self, expr: "ColumnExpr") -> "UnboundTerm":
"""Convert a column reference to an Iceberg reference."""
return Reference(expr.name)

def visit_literal(self, expr: "LiteralExpr") -> "Literal[Any]":
def visit_literal(self, expr: "LiteralExpr") -> "Literal":
"""Convert a literal value to an Iceberg literal."""
return literal(expr.value)

Expand Down Expand Up @@ -148,13 +148,11 @@ def visit_unary(self, expr: "UnaryExpr") -> "BooleanExpression":

def visit_alias(
self, expr: "AliasExpr"
) -> "BooleanExpression | UnboundTerm[Any] | Literal[Any]":
) -> "BooleanExpression | UnboundTerm | Literal":
"""Convert an aliased expression (just unwrap the alias)."""
return self.visit(expr.expr)

def visit_udf(
self, expr: "UDFExpr"
) -> "BooleanExpression | UnboundTerm[Any] | Literal[Any]":
def visit_udf(self, expr: "UDFExpr") -> "BooleanExpression | UnboundTerm | Literal":
"""UDF expressions cannot be converted to Iceberg expressions."""
raise TypeError(
"UDF expressions cannot be converted to Iceberg expressions. "
Expand All @@ -163,23 +161,23 @@ def visit_udf(

def visit_download(
self, expr: "DownloadExpr"
) -> "BooleanExpression | UnboundTerm[Any] | Literal[Any]":
) -> "BooleanExpression | UnboundTerm | Literal":
"""Download expressions cannot be converted to Iceberg expressions."""
raise TypeError(
"Download expressions cannot be converted to Iceberg expressions."
)

def visit_star(
self, expr: "StarExpr"
) -> "BooleanExpression | UnboundTerm[Any] | Literal[Any]":
) -> "BooleanExpression | UnboundTerm | Literal":
"""Star expressions cannot be converted to Iceberg expressions."""
raise TypeError(
"Star expressions cannot be converted to Iceberg filter expressions."
)

def visit_monotonically_increasing_id(
self, expr: "MonotonicallyIncreasingIdExpr"
) -> "BooleanExpression | UnboundTerm[Any] | Literal[Any]":
) -> "BooleanExpression | UnboundTerm | Literal":
"""Monotonically increasing ID expressions cannot be converted to Iceberg expressions."""
raise TypeError(
"monotonically_increasing_id expressions cannot be converted to Iceberg filter expressions."
Expand Down Expand Up @@ -490,7 +488,7 @@ def get_read_tasks(
metadata = BlockMetadata(
num_rows=sum(task.file.record_count for task in chunk_tasks)
- position_delete_count,
size_bytes=sum(task.length for task in chunk_tasks),
size_bytes=sum(task.file.file_size_in_bytes for task in chunk_tasks),
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Scan size metadata overcounts split file tasks

Medium Severity

size_bytes now sums task.file.file_size_in_bytes instead of each FileScanTask range length. When plan_files contains split tasks for the same file, BlockMetadata.size_bytes is inflated and may be double-counted, which distorts Ray Data’s task size estimation and scheduling behavior.

Fix in Cursor Fix in Web

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'd make this work for both Pyiceberg <0.11 and >=0.11

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes that's an existing property in pyiceberg 0.10.0

input_files=[task.file.file_path for task in chunk_tasks],
exec_stats=None,
)
Expand Down
2 changes: 1 addition & 1 deletion python/ray/data/tests/datasource/test_iceberg.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ def test_write_basic():
table_p = (
ds.to_pandas().sort_values(["col_a", "col_b", "col_c"]).reset_index(drop=True)
)
assert orig_table_p.equals(table_p)
assert rows_same(table_p, orig_table_p)


@pytest.mark.skipif(
Expand Down
2 changes: 1 addition & 1 deletion python/requirements/ml/data-test-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ deltalake==0.9.0
pytest-mock
decord
snowflake-connector-python>=3.15.0
pyiceberg[sql-sqlite]==0.10.0
pyiceberg[sql-sqlite]==0.11.0
clickhouse-connect
kafka-python
pybase64
Expand Down
7 changes: 4 additions & 3 deletions python/requirements_compiled.txt
Original file line number Diff line number Diff line change
Expand Up @@ -1656,7 +1656,7 @@ pygments==2.18.0
# nbconvert
# rich
# sphinx
pyiceberg==0.10.0
pyiceberg==0.11.0
# via -r python/requirements/ml/data-test-requirements.txt
pyjwt==2.8.0
# via
Expand Down Expand Up @@ -2107,7 +2107,6 @@ snowflake-connector-python==3.15.0
sortedcontainers==2.4.0
# via
# distributed
# pyiceberg
# snowflake-connector-python
soundfile==0.12.1
# via -r python/requirements/ml/data-test-requirements.txt
Expand Down Expand Up @@ -2589,7 +2588,9 @@ zipp==3.19.2
zoopt==0.4.1
# via -r python/requirements/ml/tune-test-requirements.txt
zstandard==0.23.0
# via clickhouse-connect
# via
# clickhouse-connect
# pyiceberg

# The following packages are considered to be unsafe in a requirements file:
# pip
Expand Down
2 changes: 1 addition & 1 deletion python/requirements_compiled_py3.13.txt
Original file line number Diff line number Diff line change
Expand Up @@ -1699,7 +1699,7 @@ pygments==2.18.0
# nbconvert
# rich
# sphinx
pyiceberg==0.10.0
pyiceberg==0.11.0
# via -r python/requirements/ml/data-test-requirements.txt
pyjwt==2.8.0
# via
Expand Down