Skip to content

Commit 6abfb09

Browse files
authored
Merge branch 'main' into ns-timestamp
2 parents adce631 + c06e320 commit 6abfb09

10 files changed

Lines changed: 388 additions & 104 deletions

File tree

.github/workflows/pypi-build-artifacts.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ jobs:
6262
if: startsWith(matrix.os, 'ubuntu')
6363

6464
- name: Build wheels
65-
uses: pypa/cibuildwheel@v2.23.0
65+
uses: pypa/cibuildwheel@v2.23.1
6666
with:
6767
output-dir: wheelhouse
6868
config-file: "pyproject.toml"

.github/workflows/svn-build-artifacts.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ jobs:
5757
if: startsWith(matrix.os, 'ubuntu')
5858

5959
- name: Build wheels
60-
uses: pypa/cibuildwheel@v2.23.0
60+
uses: pypa/cibuildwheel@v2.23.1
6161
with:
6262
output-dir: wheelhouse
6363
config-file: "pyproject.toml"

poetry.lock

Lines changed: 90 additions & 90 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyiceberg/conversions.py

Lines changed: 218 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
- Converting partition strings to built-in python objects.
2121
- Converting a value to a byte buffer.
2222
- Converting a byte buffer to a value.
23+
- Converting a json-single field serialized field
2324
2425
Note:
2526
Conversion logic varies based on the PrimitiveType implementation. Therefore conversion functions
@@ -28,6 +29,7 @@
2829
implementations that share the same conversion logic, registrations can be stacked.
2930
"""
3031

32+
import codecs
3133
import uuid
3234
from datetime import date, datetime, time
3335
from decimal import Decimal
@@ -62,7 +64,24 @@
6264
UUIDType,
6365
strtobool,
6466
)
65-
from pyiceberg.utils.datetime import date_to_days, datetime_to_micros, datetime_to_nanos, time_to_micros
67+
from pyiceberg.utils.datetime import (
68+
date_str_to_days,
69+
date_to_days,
70+
datetime_to_micros,
71+
datetime_to_nanos,
72+
days_to_date,
73+
micros_to_time,
74+
micros_to_timestamp,
75+
micros_to_timestamptz,
76+
time_str_to_micros,
77+
time_to_micros,
78+
timestamp_to_micros,
79+
timestamptz_to_micros,
80+
to_human_day,
81+
to_human_time,
82+
to_human_timestamp,
83+
to_human_timestamptz,
84+
)
6685
from pyiceberg.utils.decimal import decimal_to_bytes, unscaled_to_decimal
6786

6887
_BOOL_STRUCT = Struct("<?")
@@ -295,7 +314,7 @@ def from_bytes(primitive_type: PrimitiveType, b: bytes) -> L: # type: ignore
295314
primitive_type (PrimitiveType): An implementation of the PrimitiveType base class.
296315
b (bytes): The bytes to convert.
297316
"""
298-
raise TypeError(f"Cannot deserialize bytes, type {primitive_type} not supported: {str(b)}")
317+
raise TypeError(f"Cannot deserialize bytes, type {primitive_type} not supported: {b!r}")
299318

300319

301320
@from_bytes.register(BooleanType)
@@ -350,3 +369,200 @@ def _(primitive_type: DecimalType, buf: bytes) -> Decimal:
350369
@from_bytes.register(UnknownType)
351370
def _(type_: UnknownType, buf: bytes) -> None:
352371
return None
372+
373+
374+
@singledispatch # type: ignore
375+
def to_json(primitive_type: PrimitiveType, val: Any) -> L: # type: ignore
376+
"""Convert built-in python values into JSON value types.
377+
378+
https://iceberg.apache.org/spec/#json-single-value-serialization
379+
380+
Args:
381+
primitive_type (PrimitiveType): An implementation of the PrimitiveType base class.
382+
val (Any): The arbitrary built-in value to convert into the right form
383+
"""
384+
raise TypeError(f"Cannot deserialize bytes, type {primitive_type} not supported: {val}")
385+
386+
387+
@to_json.register(BooleanType)
388+
def _(_: BooleanType, val: bool) -> bool:
389+
"""Python bool automatically converts into a JSON bool."""
390+
return val
391+
392+
393+
@to_json.register(IntegerType)
394+
@to_json.register(LongType)
395+
def _(_: Union[IntegerType, LongType], val: int) -> int:
396+
"""Python int automatically converts to a JSON int."""
397+
return val
398+
399+
400+
@to_json.register(DateType)
401+
def _(_: DateType, val: Union[date, int]) -> str:
402+
"""JSON date is string encoded."""
403+
if isinstance(val, date):
404+
val = date_to_days(val)
405+
return to_human_day(val)
406+
407+
408+
@to_json.register(TimeType)
409+
def _(_: TimeType, val: Union[int, time]) -> str:
410+
"""Python time or microseconds since epoch serializes into an ISO8601 time."""
411+
if isinstance(val, time):
412+
val = time_to_micros(val)
413+
return to_human_time(val)
414+
415+
416+
@to_json.register(TimestampType)
417+
def _(_: PrimitiveType, val: Union[int, datetime]) -> str:
418+
"""Python datetime (without timezone) or microseconds since epoch serializes into an ISO8601 timestamp."""
419+
if isinstance(val, datetime):
420+
val = datetime_to_micros(val)
421+
422+
return to_human_timestamp(val)
423+
424+
425+
@to_json.register(TimestamptzType)
426+
def _(_: TimestamptzType, val: Union[int, datetime]) -> str:
427+
"""Python datetime (with timezone) or microseconds since epoch serializes into an ISO8601 timestamp."""
428+
if isinstance(val, datetime):
429+
val = datetime_to_micros(val)
430+
return to_human_timestamptz(val)
431+
432+
433+
@to_json.register(FloatType)
434+
@to_json.register(DoubleType)
435+
def _(_: Union[FloatType, DoubleType], val: float) -> float:
436+
"""Float serializes into JSON float."""
437+
return val
438+
439+
440+
@to_json.register(StringType)
441+
def _(_: StringType, val: str) -> str:
442+
"""Python string serializes into JSON string."""
443+
return val
444+
445+
446+
@to_json.register(FixedType)
447+
def _(t: FixedType, b: bytes) -> str:
448+
"""Python bytes serializes into hexadecimal encoded string."""
449+
if len(t) != len(b):
450+
raise ValueError(f"FixedType has length {len(t)}, which is different from the value: {len(b)}")
451+
452+
return codecs.encode(b, "hex").decode(UTF8)
453+
454+
455+
@to_json.register(BinaryType)
456+
def _(_: BinaryType, b: bytes) -> str:
457+
"""Python bytes serializes into hexadecimal encoded string."""
458+
return codecs.encode(b, "hex").decode(UTF8)
459+
460+
461+
@to_json.register(DecimalType)
462+
def _(_: DecimalType, val: Decimal) -> str:
463+
"""Python decimal serializes into string.
464+
465+
Stores the string representation of the decimal value, specifically, for
466+
values with a positive scale, the number of digits to the right of the
467+
decimal point is used to indicate scale, for values with a negative scale,
468+
the scientific notation is used and the exponent must equal the negated scale.
469+
"""
470+
return str(val)
471+
472+
473+
@to_json.register(UUIDType)
474+
def _(_: UUIDType, val: uuid.UUID) -> str:
475+
"""Serialize into a JSON string."""
476+
return str(val)
477+
478+
479+
@singledispatch # type: ignore
480+
def from_json(primitive_type: PrimitiveType, val: Any) -> L: # type: ignore
481+
"""Convert JSON value types into built-in python values.
482+
483+
https://iceberg.apache.org/spec/#json-single-value-serialization
484+
485+
Args:
486+
primitive_type (PrimitiveType): An implementation of the PrimitiveType base class.
487+
val (Any): The arbitrary JSON value to convert into the right form
488+
"""
489+
raise TypeError(f"Cannot deserialize bytes, type {primitive_type} not supported: {str(val)}")
490+
491+
492+
@from_json.register(BooleanType)
493+
def _(_: BooleanType, val: bool) -> bool:
494+
"""JSON bool automatically converts into a Python bool."""
495+
return val
496+
497+
498+
@from_json.register(IntegerType)
499+
@from_json.register(LongType)
500+
def _(_: Union[IntegerType, LongType], val: int) -> int:
501+
"""JSON int automatically converts to a Python int."""
502+
return val
503+
504+
505+
@from_json.register(DateType)
506+
def _(_: DateType, val: str) -> date:
507+
"""JSON date is string encoded."""
508+
return days_to_date(date_str_to_days(val))
509+
510+
511+
@from_json.register(TimeType)
512+
def _(_: TimeType, val: str) -> time:
513+
"""JSON ISO8601 string into Python time."""
514+
return micros_to_time(time_str_to_micros(val))
515+
516+
517+
@from_json.register(TimestampType)
518+
def _(_: PrimitiveType, val: str) -> datetime:
519+
"""JSON ISO8601 string into Python datetime."""
520+
return micros_to_timestamp(timestamp_to_micros(val))
521+
522+
523+
@from_json.register(TimestamptzType)
524+
def _(_: TimestamptzType, val: str) -> datetime:
525+
"""JSON ISO8601 string into Python datetime."""
526+
return micros_to_timestamptz(timestamptz_to_micros(val))
527+
528+
529+
@from_json.register(FloatType)
530+
@from_json.register(DoubleType)
531+
def _(_: Union[FloatType, DoubleType], val: float) -> float:
532+
"""JSON float deserializes into a Python float."""
533+
return val
534+
535+
536+
@from_json.register(StringType)
537+
def _(_: StringType, val: str) -> str:
538+
"""JSON string serializes into a Python string."""
539+
return val
540+
541+
542+
@from_json.register(FixedType)
543+
def _(t: FixedType, val: str) -> bytes:
544+
"""JSON hexadecimal encoded string into bytes."""
545+
b = codecs.decode(val.encode(UTF8), "hex")
546+
547+
if len(t) != len(b):
548+
raise ValueError(f"FixedType has length {len(t)}, which is different from the value: {len(b)}")
549+
550+
return b
551+
552+
553+
@from_json.register(BinaryType)
554+
def _(_: BinaryType, val: str) -> bytes:
555+
"""JSON hexadecimal encoded string into bytes."""
556+
return codecs.decode(val.encode(UTF8), "hex")
557+
558+
559+
@from_json.register(DecimalType)
560+
def _(_: DecimalType, val: str) -> Decimal:
561+
"""Convert JSON string into a Python Decimal."""
562+
return Decimal(val)
563+
564+
565+
@from_json.register(UUIDType)
566+
def _(_: UUIDType, val: str) -> uuid.UUID:
567+
"""Convert JSON string into Python UUID."""
568+
return uuid.UUID(val)

pyiceberg/table/__init__.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1198,10 +1198,11 @@ def upsert(
11981198

11991199
update_row_cnt = len(rows_to_update)
12001200

1201-
# build the match predicate filter
1202-
overwrite_mask_predicate = upsert_util.create_match_filter(rows_to_update, join_cols)
1201+
if len(rows_to_update) > 0:
1202+
# build the match predicate filter
1203+
overwrite_mask_predicate = upsert_util.create_match_filter(rows_to_update, join_cols)
12031204

1204-
tx.overwrite(rows_to_update, overwrite_filter=overwrite_mask_predicate)
1205+
tx.overwrite(rows_to_update, overwrite_filter=overwrite_mask_predicate)
12051206

12061207
if when_not_matched_insert_all:
12071208
expr_match = upsert_util.create_match_filter(matched_iceberg_table, join_cols)
@@ -1211,7 +1212,8 @@ def upsert(
12111212

12121213
insert_row_cnt = len(rows_to_insert)
12131214

1214-
tx.append(rows_to_insert)
1215+
if insert_row_cnt > 0:
1216+
tx.append(rows_to_insert)
12151217

12161218
return UpsertResult(rows_updated=update_row_cnt, rows_inserted=insert_row_cnt)
12171219

pyiceberg/table/inspect.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,7 @@ def _readable_metrics_struct(bound_type: PrimitiveType) -> pa.StructType:
161161
entries = []
162162
snapshot = self._get_snapshot(snapshot_id)
163163
for manifest in snapshot.manifests(self.tbl.io):
164-
for entry in manifest.fetch_manifest_entry(io=self.tbl.io):
164+
for entry in manifest.fetch_manifest_entry(io=self.tbl.io, discard_deleted=False):
165165
column_sizes = entry.data_file.column_sizes or {}
166166
value_counts = entry.data_file.value_counts or {}
167167
null_value_counts = entry.data_file.null_value_counts or {}

pyproject.toml

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ thrift-sasl = { version = ">=0.4.3", optional = true }
8787
pytest = "7.4.4"
8888
pytest-checkdocs = "2.13.0"
8989
pytest-lazy-fixture = "0.6.3"
90-
pre-commit = "4.1.0"
90+
pre-commit = "4.2.0"
9191
fastavro = "1.10.0"
9292
coverage = { version = "^7.4.2", extras = ["toml"] }
9393
requests-mock = "1.12.1"
@@ -103,14 +103,14 @@ docutils = "!=0.21.post1" # https://github.com/python-poetry/poetry/issues/924
103103
[tool.poetry.group.docs.dependencies]
104104
# for mkdocs
105105
mkdocs = "1.6.1"
106-
griffe = "1.6.0"
106+
griffe = "1.6.1"
107107
jinja2 = "3.1.6"
108108
mkdocstrings = "0.29.0"
109-
mkdocstrings-python = "1.16.5"
110-
mkdocs-literate-nav = "0.6.1"
109+
mkdocstrings-python = "1.16.6"
110+
mkdocs-literate-nav = "0.6.2"
111111
mkdocs-autorefs = "1.4.1"
112112
mkdocs-gen-files = "0.5.0"
113-
mkdocs-material = "9.6.8"
113+
mkdocs-material = "9.6.9"
114114
mkdocs-material-extensions = "1.3.1"
115115
mkdocs-section-index = "0.3.9"
116116

tests/integration/test_inspect_table.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,8 @@ def test_inspect_entries(
164164

165165
# Write some data
166166
tbl.append(arrow_table_with_null)
167+
# Generate a DELETE entry
168+
tbl.overwrite(arrow_table_with_null)
167169

168170
def check_pyiceberg_df_equals_spark_df(df: pa.Table, spark_df: DataFrame) -> None:
169171
assert df.column_names == [
@@ -185,6 +187,8 @@ def check_pyiceberg_df_equals_spark_df(df: pa.Table, spark_df: DataFrame) -> Non
185187

186188
lhs = df.to_pandas()
187189
rhs = spark_df.toPandas()
190+
assert len(lhs) == len(rhs)
191+
188192
for column in df.column_names:
189193
for left, right in zip(lhs[column].to_list(), rhs[column].to_list()):
190194
if column == "data_file":

tests/table/test_upsert.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
from pyiceberg.io.pyarrow import schema_to_pyarrow
2929
from pyiceberg.schema import Schema
3030
from pyiceberg.table import UpsertResult
31+
from pyiceberg.table.snapshots import Operation
3132
from pyiceberg.table.upsert_util import create_match_filter
3233
from pyiceberg.types import IntegerType, NestedField, StringType
3334
from tests.catalog.test_base import InMemoryCatalog, Table
@@ -368,9 +369,21 @@ def test_upsert_with_identifier_fields(catalog: Catalog) -> None:
368369
)
369370
upd = tbl.upsert(df)
370371

372+
expected_operations = [Operation.APPEND, Operation.OVERWRITE, Operation.APPEND, Operation.APPEND]
373+
371374
assert upd.rows_updated == 1
372375
assert upd.rows_inserted == 1
373376

377+
assert [snap.summary.operation for snap in tbl.snapshots() if snap.summary is not None] == expected_operations
378+
379+
# This should be a no-op
380+
upd = tbl.upsert(df)
381+
382+
assert upd.rows_updated == 0
383+
assert upd.rows_inserted == 0
384+
385+
assert [snap.summary.operation for snap in tbl.snapshots() if snap.summary is not None] == expected_operations
386+
374387

375388
def test_upsert_into_empty_table(catalog: Catalog) -> None:
376389
identifier = "default.test_upsert_into_empty_table"

0 commit comments

Comments
 (0)