Skip to content

Commit f26fcac

Browse files
committed
Merge remote-tracking branch 'upstream/main'
Merge with upstream
2 parents 3fbae72 + 5d6e1e2 commit f26fcac

9 files changed

Lines changed: 118 additions & 45 deletions

File tree

.github/pull_request_template.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@ Thanks for opening a pull request!
77

88
# Rationale for this change
99

10-
# Are these changes tested?
10+
## Are these changes tested?
1111

12-
# Are there any user-facing changes?
12+
## Are there any user-facing changes?
1313

1414
<!-- In the case of user-facing changes, please add the changelog label. -->

.pre-commit-config.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ repos:
3939
args:
4040
[--install-types, --non-interactive, --config=pyproject.toml]
4141
- repo: https://github.com/igorshubovych/markdownlint-cli
42-
rev: v0.43.0
42+
rev: v0.45.0
4343
hooks:
4444
- id: markdownlint
4545
args: ["--fix"]

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,6 @@ PyIceberg is a Python library for programmatic access to Iceberg table metadata
2121

2222
The documentation is available at [https://py.iceberg.apache.org/](https://py.iceberg.apache.org/).
2323

24-
# Get in Touch
24+
## Get in Touch
2525

2626
- [Iceberg community](https://iceberg.apache.org/community/)

mkdocs/docs/how-to-release.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -397,7 +397,7 @@ Ensure to update the `PYICEBERG_VERSION` in the [Dockerfile](https://github.com/
397397

398398
### Set up GPG key and Upload to Apache Iceberg KEYS file
399399

400-
To set up GPG key locally, see the instructions [here](http://www.apache.org/dev/openpgp.html#key-gen-generate-key).
400+
To set up GPG key locally, see the [instructions](http://www.apache.org/dev/openpgp.html#key-gen-generate-key).
401401

402402
To install gpg on a M1 based Mac, a couple of additional steps are required: <https://gist.github.com/phortuin/cf24b1cca3258720c71ad42977e1ba57>.
403403

mkdocs/docs/verify-release.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ make test-coverage
113113

114114
This will spin up Docker containers to facilitate running test coverage.
115115

116-
# Cast the vote
116+
## Cast the vote
117117

118118
Votes are cast by replying to the release candidate announcement email on the dev mailing list with either `+1`, `0`, or `-1`. For example :
119119

pyiceberg/io/pyarrow.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -779,6 +779,7 @@ def visit_uuid(self, _: UUIDType) -> pa.DataType:
779779
return pa.uuid()
780780

781781
def visit_unknown(self, _: UnknownType) -> pa.DataType:
782+
"""Type `UnknownType` can be promoted to any primitive type in V3+ tables per the Iceberg spec."""
782783
return pa.null()
783784

784785
def visit_binary(self, _: BinaryType) -> pa.DataType:
@@ -1358,6 +1359,8 @@ def primitive(self, primitive: pa.DataType) -> PrimitiveType:
13581359
primitive = cast(pa.FixedSizeBinaryType, primitive)
13591360
return FixedType(primitive.byte_width)
13601361
elif pa.types.is_null(primitive):
1362+
# PyArrow null type (pa.null()) is converted to Iceberg UnknownType
1363+
# UnknownType can be promoted to any primitive type in V3+ tables per the Iceberg spec
13611364
return UnknownType()
13621365
elif isinstance(primitive, pa.UuidType):
13631366
return UUIDType()

pyiceberg/schema.py

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1692,6 +1692,15 @@ def _(file_type: FixedType, read_type: IcebergType) -> IcebergType:
16921692
raise ResolveError(f"Cannot promote {file_type} to {read_type}")
16931693

16941694

1695+
@promote.register(UnknownType)
1696+
def _(file_type: UnknownType, read_type: IcebergType) -> IcebergType:
1697+
# Per V3 Spec, "Unknown" can be promoted to any Primitive type
1698+
if isinstance(read_type, PrimitiveType):
1699+
return read_type
1700+
else:
1701+
raise ResolveError(f"Cannot promote {file_type} to {read_type}")
1702+
1703+
16951704
def _check_schema_compatible(requested_schema: Schema, provided_schema: Schema) -> None:
16961705
"""
16971706
Check if the `provided_schema` is compatible with `requested_schema`.
@@ -1761,7 +1770,15 @@ def _is_field_compatible(self, lhs: NestedField) -> bool:
17611770
self.rich_table.add_row("✅", str(lhs), str(rhs))
17621771
return True
17631772
except ResolveError:
1764-
self.rich_table.add_row("❌", str(lhs), str(rhs))
1773+
# UnknownType can only be promoted to Primitive types
1774+
if isinstance(rhs.field_type, UnknownType):
1775+
if not isinstance(lhs.field_type, PrimitiveType):
1776+
error_msg = f"Null type (UnknownType) cannot be promoted to non-primitive type {lhs.field_type}. UnknownType can only be promoted to primitive types (string, int, boolean, etc.) in V3+ tables."
1777+
else:
1778+
error_msg = f"Null type (UnknownType) cannot be promoted to {lhs.field_type}. This may be due to table format version limitations (V1/V2 tables don't support UnknownType promotion)."
1779+
self.rich_table.add_row("❌", str(lhs), f"{str(rhs)} - {error_msg}")
1780+
else:
1781+
self.rich_table.add_row("❌", str(lhs), str(rhs))
17651782
return False
17661783

17671784
def schema(self, schema: Schema, struct_result: Callable[[], bool]) -> bool:

pyiceberg/table/inspect.py

Lines changed: 60 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
from typing import TYPE_CHECKING, Any, Dict, Iterator, List, Optional, Set, Tuple
2121

2222
from pyiceberg.conversions import from_bytes
23-
from pyiceberg.manifest import DataFile, DataFileContent, ManifestContent, ManifestFile, PartitionFieldSummary
23+
from pyiceberg.manifest import DataFileContent, ManifestContent, ManifestFile, PartitionFieldSummary
2424
from pyiceberg.partitioning import PartitionSpec
2525
from pyiceberg.table.snapshots import Snapshot, ancestors_of
2626
from pyiceberg.types import PrimitiveType
@@ -288,64 +288,86 @@ def partitions(self, snapshot_id: Optional[int] = None) -> "pa.Table":
288288

289289
table_schema = pa.unify_schemas([partitions_schema, table_schema])
290290

291-
def update_partitions_map(
292-
partitions_map: Dict[Tuple[str, Any], Any],
293-
file: DataFile,
294-
partition_record_dict: Dict[str, Any],
295-
snapshot: Optional[Snapshot],
296-
) -> None:
291+
snapshot = self._get_snapshot(snapshot_id)
292+
executor = ExecutorFactory.get_or_create()
293+
local_partitions_maps = executor.map(self._process_manifest, snapshot.manifests(self.tbl.io))
294+
295+
partitions_map: Dict[Tuple[str, Any], Any] = {}
296+
for local_map in local_partitions_maps:
297+
for partition_record_key, partition_row in local_map.items():
298+
if partition_record_key not in partitions_map:
299+
partitions_map[partition_record_key] = partition_row
300+
else:
301+
existing = partitions_map[partition_record_key]
302+
existing["record_count"] += partition_row["record_count"]
303+
existing["file_count"] += partition_row["file_count"]
304+
existing["total_data_file_size_in_bytes"] += partition_row["total_data_file_size_in_bytes"]
305+
existing["position_delete_record_count"] += partition_row["position_delete_record_count"]
306+
existing["position_delete_file_count"] += partition_row["position_delete_file_count"]
307+
existing["equality_delete_record_count"] += partition_row["equality_delete_record_count"]
308+
existing["equality_delete_file_count"] += partition_row["equality_delete_file_count"]
309+
310+
if partition_row["last_updated_at"] and (
311+
not existing["last_updated_at"] or partition_row["last_updated_at"] > existing["last_updated_at"]
312+
):
313+
existing["last_updated_at"] = partition_row["last_updated_at"]
314+
existing["last_updated_snapshot_id"] = partition_row["last_updated_snapshot_id"]
315+
316+
return pa.Table.from_pylist(
317+
partitions_map.values(),
318+
schema=table_schema,
319+
)
320+
321+
def _process_manifest(self, manifest: ManifestFile) -> Dict[Tuple[str, Any], Any]:
322+
partitions_map: Dict[Tuple[str, Any], Any] = {}
323+
for entry in manifest.fetch_manifest_entry(io=self.tbl.io):
324+
partition = entry.data_file.partition
325+
partition_record_dict = {
326+
field.name: partition[pos]
327+
for pos, field in enumerate(self.tbl.metadata.specs()[manifest.partition_spec_id].fields)
328+
}
329+
entry_snapshot = self.tbl.snapshot_by_id(entry.snapshot_id) if entry.snapshot_id is not None else None
330+
297331
partition_record_key = _convert_to_hashable_type(partition_record_dict)
298332
if partition_record_key not in partitions_map:
299333
partitions_map[partition_record_key] = {
300334
"partition": partition_record_dict,
301-
"spec_id": file.spec_id,
335+
"spec_id": entry.data_file.spec_id,
302336
"record_count": 0,
303337
"file_count": 0,
304338
"total_data_file_size_in_bytes": 0,
305339
"position_delete_record_count": 0,
306340
"position_delete_file_count": 0,
307341
"equality_delete_record_count": 0,
308342
"equality_delete_file_count": 0,
309-
"last_updated_at": snapshot.timestamp_ms if snapshot else None,
310-
"last_updated_snapshot_id": snapshot.snapshot_id if snapshot else None,
343+
"last_updated_at": entry_snapshot.timestamp_ms if entry_snapshot else None,
344+
"last_updated_snapshot_id": entry_snapshot.snapshot_id if entry_snapshot else None,
311345
}
312346

313347
partition_row = partitions_map[partition_record_key]
314348

315-
if snapshot is not None:
316-
if partition_row["last_updated_at"] is None or partition_row["last_updated_snapshot_id"] < snapshot.timestamp_ms:
317-
partition_row["last_updated_at"] = snapshot.timestamp_ms
318-
partition_row["last_updated_snapshot_id"] = snapshot.snapshot_id
349+
if entry_snapshot is not None:
350+
if (
351+
partition_row["last_updated_at"] is None
352+
or partition_row["last_updated_snapshot_id"] < entry_snapshot.timestamp_ms
353+
):
354+
partition_row["last_updated_at"] = entry_snapshot.timestamp_ms
355+
partition_row["last_updated_snapshot_id"] = entry_snapshot.snapshot_id
319356

320-
if file.content == DataFileContent.DATA:
321-
partition_row["record_count"] += file.record_count
357+
if entry.data_file.content == DataFileContent.DATA:
358+
partition_row["record_count"] += entry.data_file.record_count
322359
partition_row["file_count"] += 1
323-
partition_row["total_data_file_size_in_bytes"] += file.file_size_in_bytes
324-
elif file.content == DataFileContent.POSITION_DELETES:
325-
partition_row["position_delete_record_count"] += file.record_count
360+
partition_row["total_data_file_size_in_bytes"] += entry.data_file.file_size_in_bytes
361+
elif entry.data_file.content == DataFileContent.POSITION_DELETES:
362+
partition_row["position_delete_record_count"] += entry.data_file.record_count
326363
partition_row["position_delete_file_count"] += 1
327-
elif file.content == DataFileContent.EQUALITY_DELETES:
328-
partition_row["equality_delete_record_count"] += file.record_count
364+
elif entry.data_file.content == DataFileContent.EQUALITY_DELETES:
365+
partition_row["equality_delete_record_count"] += entry.data_file.record_count
329366
partition_row["equality_delete_file_count"] += 1
330367
else:
331-
raise ValueError(f"Unknown DataFileContent ({file.content})")
332-
333-
partitions_map: Dict[Tuple[str, Any], Any] = {}
334-
snapshot = self._get_snapshot(snapshot_id)
335-
for manifest in snapshot.manifests(self.tbl.io):
336-
for entry in manifest.fetch_manifest_entry(io=self.tbl.io):
337-
partition = entry.data_file.partition
338-
partition_record_dict = {
339-
field.name: partition[pos]
340-
for pos, field in enumerate(self.tbl.metadata.specs()[manifest.partition_spec_id].fields)
341-
}
342-
entry_snapshot = self.tbl.snapshot_by_id(entry.snapshot_id) if entry.snapshot_id is not None else None
343-
update_partitions_map(partitions_map, entry.data_file, partition_record_dict, entry_snapshot)
368+
raise ValueError(f"Unknown DataFileContent ({entry.data_file.content})")
344369

345-
return pa.Table.from_pylist(
346-
partitions_map.values(),
347-
schema=table_schema,
348-
)
370+
return partitions_map
349371

350372
def _get_manifests_schema(self) -> "pa.Schema":
351373
import pyarrow as pa

tests/test_schema.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
TimestampType,
5656
TimestamptzType,
5757
TimeType,
58+
UnknownType,
5859
UUIDType,
5960
)
6061

@@ -920,6 +921,36 @@ def test_promotion(file_type: IcebergType, read_type: IcebergType) -> None:
920921
promote(file_type, read_type)
921922

922923

924+
def test_unknown_type_promotion_to_primitive() -> None:
925+
"""Test that UnknownType can be promoted to primitive types (V3+ behavior)"""
926+
unknown_type = UnknownType()
927+
928+
assert promote(unknown_type, StringType()) == StringType()
929+
assert promote(unknown_type, IntegerType()) == IntegerType()
930+
assert promote(unknown_type, BooleanType()) == BooleanType()
931+
assert promote(unknown_type, FloatType()) == FloatType()
932+
933+
934+
def test_unknown_type_promotion_to_non_primitive_raises_resolve_error() -> None:
935+
"""Test that UnknownType cannot be promoted to non-primitive types and raises ResolveError"""
936+
unknown_type = UnknownType()
937+
938+
with pytest.raises(ResolveError) as exc_info:
939+
promote(unknown_type, ListType(element_id=1, element_type=StringType(), element_required=False))
940+
941+
assert "Cannot promote unknown to list<string>" in str(exc_info.value)
942+
943+
with pytest.raises(ResolveError) as exc_info:
944+
promote(unknown_type, MapType(key_id=1, key_type=StringType(), value_id=2, value_type=StringType(), value_required=False))
945+
946+
assert "Cannot promote unknown to map<string, string>" in str(exc_info.value)
947+
948+
with pytest.raises(ResolveError) as exc_info:
949+
promote(unknown_type, StructType(NestedField(field_id=1, name="field", field_type=StringType(), required=False)))
950+
951+
assert "Cannot promote unknown to struct<1: field: optional string>" in str(exc_info.value)
952+
953+
923954
@pytest.fixture()
924955
def primitive_fields() -> List[NestedField]:
925956
return [

0 commit comments

Comments
 (0)