Skip to content

Commit 53c9107

Browse files
committed
Merge branch 'main' into maint/catalog-impl-roundtripping
2 parents c7d70ed + e9c0253 commit 53c9107

21 files changed

Lines changed: 1343 additions & 645 deletions

mkdocs/docs/api.md

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1523,6 +1523,52 @@ print(ray_dataset.take(2))
15231523
]
15241524
```
15251525

1526+
### Bodo
1527+
1528+
PyIceberg interfaces closely with Bodo Dataframes (see [Bodo Iceberg Quick Start](https://docs.bodo.ai/latest/quick_start/quickstart_local_iceberg/)),
1529+
which provides a drop-in replacement for Pandas that applies query, compiler and HPC optimizations automatically.
1530+
Bodo accelerates and scales Python code from single laptops to large clusters without code rewrites.
1531+
1532+
<!-- prettier-ignore-start -->
1533+
1534+
!!! note "Requirements"
1535+
This requires [`bodo` to be installed](index.md).
1536+
1537+
```python
1538+
pip install pyiceberg['bodo']
1539+
```
1540+
<!-- prettier-ignore-end -->
1541+
1542+
A table can be read easily into a Bodo Dataframe to perform Pandas operations:
1543+
1544+
```python
1545+
df = table.to_bodo() # equivalent to `bodo.pandas.read_iceberg_table(table)`
1546+
df = df[df["trip_distance"] >= 10.0]
1547+
df = df[["VendorID", "tpep_pickup_datetime", "tpep_dropoff_datetime"]]
1548+
print(df)
1549+
```
1550+
1551+
This creates a lazy query, optimizes it, and runs it on all available cores (print triggers execution):
1552+
1553+
```python
1554+
VendorID tpep_pickup_datetime tpep_dropoff_datetime
1555+
0 2 2023-01-01 00:27:12 2023-01-01 00:49:56
1556+
1 2 2023-01-01 00:09:29 2023-01-01 00:29:23
1557+
2 1 2023-01-01 00:13:30 2023-01-01 00:44:00
1558+
3 2 2023-01-01 00:41:41 2023-01-01 01:19:32
1559+
4 2 2023-01-01 00:22:39 2023-01-01 01:30:45
1560+
... ... ... ...
1561+
245478 2 2023-01-31 22:32:57 2023-01-31 23:01:48
1562+
245479 2 2023-01-31 22:03:26 2023-01-31 22:46:13
1563+
245480 2 2023-01-31 23:25:56 2023-02-01 00:05:42
1564+
245481 2 2023-01-31 23:18:00 2023-01-31 23:46:00
1565+
245482 2 2023-01-31 23:18:00 2023-01-31 23:41:00
1566+
1567+
[245483 rows x 3 columns]
1568+
```
1569+
1570+
Bodo is optimized to take advantage of Iceberg features such as hidden partitioning and various statistics for efficient reads.
1571+
15261572
### Daft
15271573

15281574
PyIceberg interfaces closely with Daft Dataframes (see also: [Daft integration with Iceberg](https://docs.daft.ai/en/stable/io/iceberg/)) which provides a full lazily optimized query engine interface on top of PyIceberg tables.

mkdocs/docs/index.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ You can mix and match optional dependencies depending on your needs:
5252
| pandas | Installs both PyArrow and Pandas |
5353
| duckdb | Installs both PyArrow and DuckDB |
5454
| ray | Installs PyArrow, Pandas, and Ray |
55+
| bodo | Installs Bodo |
5556
| daft | Installs Daft |
5657
| polars | Installs Polars |
5758
| s3fs | S3FS as a FileIO implementation to interact with the object store |

poetry.lock

Lines changed: 820 additions & 602 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyiceberg/io/pyarrow.py

Lines changed: 34 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2728,9 +2728,11 @@ def _determine_partitions(spec: PartitionSpec, schema: Schema, arrow_table: pa.T
27282728

27292729
for partition, name in zip(spec.fields, partition_fields):
27302730
source_field = schema.find_field(partition.source_id)
2731-
arrow_table = arrow_table.append_column(
2732-
name, partition.transform.pyarrow_transform(source_field.field_type)(arrow_table[source_field.name])
2733-
)
2731+
full_field_name = schema.find_column_name(partition.source_id)
2732+
if full_field_name is None:
2733+
raise ValueError(f"Could not find column name for field ID: {partition.source_id}")
2734+
field_array = _get_field_from_arrow_table(arrow_table, full_field_name)
2735+
arrow_table = arrow_table.append_column(name, partition.transform.pyarrow_transform(source_field.field_type)(field_array))
27342736

27352737
unique_partition_fields = arrow_table.select(partition_fields).group_by(partition_fields).aggregate([])
27362738

@@ -2765,3 +2767,32 @@ def _determine_partitions(spec: PartitionSpec, schema: Schema, arrow_table: pa.T
27652767
)
27662768

27672769
return table_partitions
2770+
2771+
2772+
def _get_field_from_arrow_table(arrow_table: pa.Table, field_path: str) -> pa.Array:
2773+
"""Get a field from an Arrow table, supporting both literal field names and nested field paths.
2774+
2775+
This function handles two cases:
2776+
1. Literal field names that may contain dots (e.g., "some.id")
2777+
2. Nested field paths using dot notation (e.g., "bar.baz" for nested access)
2778+
2779+
Args:
2780+
arrow_table: The Arrow table containing the field
2781+
field_path: Field name or dot-separated path
2782+
2783+
Returns:
2784+
The field as a PyArrow Array
2785+
2786+
Raises:
2787+
KeyError: If the field path cannot be resolved
2788+
"""
2789+
# Try exact column name match (handles field names containing literal dots)
2790+
if field_path in arrow_table.column_names:
2791+
return arrow_table[field_path]
2792+
2793+
# If not found as exact name, treat as nested field path
2794+
path_parts = field_path.split(".")
2795+
# Get the struct column from the table (e.g., "bar" from "bar.baz")
2796+
field_array = arrow_table[path_parts[0]]
2797+
# Navigate into the struct using the remaining path parts
2798+
return pc.struct_field(field_array, path_parts[1:])

pyiceberg/table/__init__.py

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,7 @@
137137
from pyiceberg.utils.properties import property_as_bool
138138

139139
if TYPE_CHECKING:
140+
import bodo.pandas as bd
140141
import daft
141142
import pandas as pd
142143
import polars as pl
@@ -1485,6 +1486,16 @@ def to_daft(self) -> daft.DataFrame:
14851486

14861487
return daft.read_iceberg(self)
14871488

1489+
def to_bodo(self) -> bd.DataFrame:
1490+
"""Read a bodo DataFrame lazily from this Iceberg table.
1491+
1492+
Returns:
1493+
bd.DataFrame: Unmaterialized Bodo Dataframe created from the Iceberg table
1494+
"""
1495+
import bodo.pandas as bd
1496+
1497+
return bd.read_iceberg_table(self)
1498+
14881499
def to_polars(self) -> pl.LazyFrame:
14891500
"""Lazily read from this Apache Iceberg table.
14901501
@@ -1691,7 +1702,14 @@ def to_polars(self) -> pl.DataFrame: ...
16911702

16921703
def update(self: S, **overrides: Any) -> S:
16931704
"""Create a copy of this table scan with updated fields."""
1694-
return type(self)(**{**self.__dict__, **overrides})
1705+
from inspect import signature
1706+
1707+
# Extract those attributes that are constructor parameters. We don't use self.__dict__ as the kwargs to the
1708+
# constructors because it may contain additional attributes that are not part of the constructor signature.
1709+
params = signature(type(self).__init__).parameters.keys() - {"self"} # Skip "self" parameter
1710+
kwargs = {param: getattr(self, param) for param in params} # Assume parameters are attributes
1711+
1712+
return type(self)(**{**kwargs, **overrides})
16951713

16961714
def use_ref(self: S, name: str) -> S:
16971715
if self.snapshot_id:

pyiceberg/table/snapshots.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@
5858
TOTAL_FILE_SIZE = "total-files-size"
5959
CHANGED_PARTITION_COUNT_PROP = "changed-partition-count"
6060
CHANGED_PARTITION_PREFIX = "partitions."
61+
PARTITION_SUMMARY_PROP = "partition-summaries-included"
6162
OPERATION = "operation"
6263

6364
INITIAL_SEQUENCE_NUMBER = 0
@@ -306,6 +307,8 @@ def build(self) -> Dict[str, str]:
306307
changed_partitions_size = len(self.partition_metrics)
307308
set_when_positive(properties, changed_partitions_size, CHANGED_PARTITION_COUNT_PROP)
308309
if changed_partitions_size <= self.max_changed_partitions_for_summaries:
310+
if changed_partitions_size > 0:
311+
properties[PARTITION_SUMMARY_PROP] = "true"
309312
for partition_path, update_metrics_partition in self.partition_metrics.items():
310313
if (summary := self._partition_summary(update_metrics_partition)) and len(summary) != 0:
311314
properties[CHANGED_PARTITION_PREFIX + partition_path] = summary

pyiceberg/table/statistics.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
# KIND, either express or implied. See the License for the
1515
# specific language governing permissions and limitations
1616
# under the License.
17-
from typing import Dict, List, Literal, Optional
17+
from typing import Dict, List, Literal, Optional, Union
1818

1919
from pydantic import Field
2020

@@ -48,7 +48,7 @@ class PartitionStatisticsFile(StatisticsCommonFields):
4848

4949

5050
def filter_statistics_by_snapshot_id(
51-
statistics: List[StatisticsFile],
51+
statistics: List[Union[StatisticsFile, PartitionStatisticsFile]],
5252
reject_snapshot_id: int,
53-
) -> List[StatisticsFile]:
53+
) -> List[Union[StatisticsFile, PartitionStatisticsFile]]:
5454
return [stat for stat in statistics if stat.snapshot_id != reject_snapshot_id]

pyiceberg/table/update/__init__.py

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,11 @@
3636
SnapshotLogEntry,
3737
)
3838
from pyiceberg.table.sorting import SortOrder
39-
from pyiceberg.table.statistics import StatisticsFile, filter_statistics_by_snapshot_id
39+
from pyiceberg.table.statistics import (
40+
PartitionStatisticsFile,
41+
StatisticsFile,
42+
filter_statistics_by_snapshot_id,
43+
)
4044
from pyiceberg.typedef import (
4145
IcebergBaseModel,
4246
Properties,
@@ -198,6 +202,16 @@ class RemoveStatisticsUpdate(IcebergBaseModel):
198202
snapshot_id: int = Field(alias="snapshot-id")
199203

200204

205+
class SetPartitionStatisticsUpdate(IcebergBaseModel):
206+
action: Literal["set-partition-statistics"] = Field(default="set-partition-statistics")
207+
partition_statistics: PartitionStatisticsFile
208+
209+
210+
class RemovePartitionStatisticsUpdate(IcebergBaseModel):
211+
action: Literal["remove-partition-statistics"] = Field(default="remove-partition-statistics")
212+
snapshot_id: int = Field(alias="snapshot-id")
213+
214+
201215
TableUpdate = Annotated[
202216
Union[
203217
AssignUUIDUpdate,
@@ -217,6 +231,8 @@ class RemoveStatisticsUpdate(IcebergBaseModel):
217231
RemovePropertiesUpdate,
218232
SetStatisticsUpdate,
219233
RemoveStatisticsUpdate,
234+
SetPartitionStatisticsUpdate,
235+
RemovePartitionStatisticsUpdate,
220236
],
221237
Field(discriminator="action"),
222238
]
@@ -582,6 +598,29 @@ def _(update: RemoveStatisticsUpdate, base_metadata: TableMetadata, context: _Ta
582598
return base_metadata.model_copy(update={"statistics": statistics})
583599

584600

601+
@_apply_table_update.register(SetPartitionStatisticsUpdate)
602+
def _(update: SetPartitionStatisticsUpdate, base_metadata: TableMetadata, context: _TableMetadataUpdateContext) -> TableMetadata:
603+
partition_statistics = filter_statistics_by_snapshot_id(
604+
base_metadata.partition_statistics, update.partition_statistics.snapshot_id
605+
)
606+
context.add_update(update)
607+
608+
return base_metadata.model_copy(update={"partition_statistics": partition_statistics + [update.partition_statistics]})
609+
610+
611+
@_apply_table_update.register(RemovePartitionStatisticsUpdate)
612+
def _(
613+
update: RemovePartitionStatisticsUpdate, base_metadata: TableMetadata, context: _TableMetadataUpdateContext
614+
) -> TableMetadata:
615+
if not any(part_stat.snapshot_id == update.snapshot_id for part_stat in base_metadata.partition_statistics):
616+
raise ValueError(f"Partition Statistics with snapshot id {update.snapshot_id} does not exist")
617+
618+
statistics = filter_statistics_by_snapshot_id(base_metadata.partition_statistics, update.snapshot_id)
619+
context.add_update(update)
620+
621+
return base_metadata.model_copy(update={"partition_statistics": statistics})
622+
623+
585624
def update_table_metadata(
586625
base_metadata: TableMetadata,
587626
updates: Tuple[TableUpdate, ...],

pyiceberg/transforms.py

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@
1717

1818
import base64
1919
import datetime as py_datetime
20+
import importlib
2021
import struct
22+
import types
2123
from abc import ABC, abstractmethod
2224
from enum import IntEnum
2325
from functools import singledispatch
@@ -28,6 +30,7 @@
2830
import mmh3
2931
from pydantic import Field, PositiveInt, PrivateAttr
3032

33+
from pyiceberg.exceptions import NotInstalledError
3134
from pyiceberg.expressions import (
3235
BoundEqualTo,
3336
BoundGreaterThan,
@@ -106,6 +109,17 @@
106109
TRUNCATE_PARSER = ParseNumberFromBrackets(TRUNCATE)
107110

108111

112+
def _try_import(module_name: str, extras_name: Optional[str] = None) -> types.ModuleType:
113+
try:
114+
return importlib.import_module(module_name)
115+
except ImportError:
116+
if extras_name:
117+
msg = f'{module_name} needs to be installed. pip install "pyiceberg[{extras_name}]"'
118+
else:
119+
msg = f"{module_name} needs to be installed."
120+
raise NotInstalledError(msg) from None
121+
122+
109123
def _transform_literal(func: Callable[[L], L], lit: Literal[L]) -> Literal[L]:
110124
"""Small helper to upwrap the value from the literal, and wrap it again."""
111125
return literal(func(lit.value))
@@ -382,8 +396,7 @@ def __repr__(self) -> str:
382396
return f"BucketTransform(num_buckets={self._num_buckets})"
383397

384398
def pyarrow_transform(self, source: IcebergType) -> "Callable[[pa.Array], pa.Array]":
385-
from pyiceberg_core import transform as pyiceberg_core_transform
386-
399+
pyiceberg_core_transform = _try_import("pyiceberg_core", extras_name="pyiceberg-core").transform
387400
return _pyiceberg_transform_wrapper(pyiceberg_core_transform.bucket, self._num_buckets)
388401

389402
@property
@@ -509,9 +522,8 @@ def __repr__(self) -> str:
509522
return "YearTransform()"
510523

511524
def pyarrow_transform(self, source: IcebergType) -> "Callable[[pa.Array], pa.Array]":
512-
import pyarrow as pa
513-
from pyiceberg_core import transform as pyiceberg_core_transform
514-
525+
pa = _try_import("pyarrow")
526+
pyiceberg_core_transform = _try_import("pyiceberg_core", extras_name="pyiceberg-core").transform
515527
return _pyiceberg_transform_wrapper(pyiceberg_core_transform.year, expected_type=pa.int32())
516528

517529

@@ -570,8 +582,8 @@ def __repr__(self) -> str:
570582
return "MonthTransform()"
571583

572584
def pyarrow_transform(self, source: IcebergType) -> "Callable[[pa.Array], pa.Array]":
573-
import pyarrow as pa
574-
from pyiceberg_core import transform as pyiceberg_core_transform
585+
pa = _try_import("pyarrow")
586+
pyiceberg_core_transform = _try_import("pyiceberg_core", extras_name="pyiceberg-core").transform
575587

576588
return _pyiceberg_transform_wrapper(pyiceberg_core_transform.month, expected_type=pa.int32())
577589

@@ -639,8 +651,8 @@ def __repr__(self) -> str:
639651
return "DayTransform()"
640652

641653
def pyarrow_transform(self, source: IcebergType) -> "Callable[[pa.Array], pa.Array]":
642-
import pyarrow as pa
643-
from pyiceberg_core import transform as pyiceberg_core_transform
654+
pa = _try_import("pyarrow", extras_name="pyarrow")
655+
pyiceberg_core_transform = _try_import("pyiceberg_core", extras_name="pyiceberg-core").transform
644656

645657
return _pyiceberg_transform_wrapper(pyiceberg_core_transform.day, expected_type=pa.int32())
646658

@@ -692,7 +704,7 @@ def __repr__(self) -> str:
692704
return "HourTransform()"
693705

694706
def pyarrow_transform(self, source: IcebergType) -> "Callable[[pa.Array], pa.Array]":
695-
from pyiceberg_core import transform as pyiceberg_core_transform
707+
pyiceberg_core_transform = _try_import("pyiceberg_core", extras_name="pyiceberg-core").transform
696708

697709
return _pyiceberg_transform_wrapper(pyiceberg_core_transform.hour)
698710

@@ -915,7 +927,7 @@ def __repr__(self) -> str:
915927
return f"TruncateTransform(width={self._width})"
916928

917929
def pyarrow_transform(self, source: IcebergType) -> "Callable[[pa.Array], pa.Array]":
918-
from pyiceberg_core import transform as pyiceberg_core_transform
930+
pyiceberg_core_transform = _try_import("pyiceberg_core", extras_name="pyiceberg-core").transform
919931

920932
return _pyiceberg_transform_wrapper(pyiceberg_core_transform.truncate, self._width)
921933

pyiceberg/utils/schema_conversion.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,8 +69,10 @@
6969
LOGICAL_FIELD_TYPE_MAPPING: Dict[Tuple[str, str], PrimitiveType] = {
7070
("date", "int"): DateType(),
7171
("time-micros", "long"): TimeType(),
72+
("timestamp-millis", "int"): TimestampType(),
7273
("timestamp-micros", "long"): TimestampType(),
7374
("uuid", "fixed"): UUIDType(),
75+
("uuid", "string"): UUIDType(),
7476
}
7577

7678
AvroType = Union[str, Any]

0 commit comments

Comments
 (0)