Skip to content

Commit 9d77f3f

Browse files
committed
chore: determine if a sort order is newly added. If so, set the last assigned id, else set a previous sort order as the default
1 parent d99dfdd commit 9d77f3f

1 file changed

Lines changed: 22 additions & 7 deletions

File tree

pyiceberg/table/update/sorting.py

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,9 @@
1616
# under the License.
1717
from __future__ import annotations
1818

19-
from typing import TYPE_CHECKING, Any, List, Tuple
19+
from typing import TYPE_CHECKING, Any, List, Tuple, Optional
2020

21-
from pyiceberg.table.sorting import NullOrder, SortDirection, SortField, SortOrder
21+
from pyiceberg.table.sorting import NullOrder, SortDirection, SortField, SortOrder, INITIAL_SORT_ORDER_ID, UNSORTED_SORT_ORDER
2222
from pyiceberg.table.update import (
2323
AddSortOrderUpdate,
2424
AssertDefaultSortOrderId,
@@ -36,16 +36,15 @@
3636

3737
class UpdateSortOrder(UpdateTableMetadata["UpdateSortOrder"]):
3838
_transaction: Transaction
39-
_last_assigned_order_id: int
39+
_last_assigned_order_id: Optional[int]
4040
_case_sensitive: bool
4141
_fields: List[SortField]
42-
_last_sort_order_id: int
4342

4443
def __init__(self, transaction: Transaction, case_sensitive: bool = True) -> None:
4544
super().__init__(transaction)
4645
self._fields: List[SortField] = []
4746
self._case_sensitive: bool = case_sensitive
48-
self._last_sort_order_id: int = transaction.table_metadata.default_sort_order_id
47+
self._last_assigned_order_id: Optional[int] = None
4948

5049
def _column_name_to_id(self, column_name: str) -> int:
5150
"""Map the column name to the column field id."""
@@ -75,6 +74,17 @@ def _add_sort_field(
7574
)
7675
)
7776
return self
77+
78+
def _reuse_or_create_sort_order_id(self) -> int:
79+
"""Return the last assigned sort order id or create a new one."""
80+
new_sort_order_id = INITIAL_SORT_ORDER_ID
81+
for sort_order in self._transaction.table_metadata.sort_orders:
82+
new_sort_order_id = max(new_sort_order_id, sort_order.order_id)
83+
if sort_order.fields == self._fields:
84+
return sort_order.order_id
85+
elif new_sort_order_id <= sort_order.order_id:
86+
new_sort_order_id = sort_order.order_id + 1
87+
return new_sort_order_id
7888

7989
def asc(
8090
self, source_column_name: str, transform: Transform[Any, Any], null_order: NullOrder = NullOrder.NULLS_LAST
@@ -100,15 +110,20 @@ def desc(
100110

101111
def _apply(self) -> SortOrder:
102112
"""Return the sort order."""
103-
return SortOrder(*self._fields, order_id=self._last_sort_order_id + 1)
113+
if next(iter(self._fields), None) is None:
114+
return UNSORTED_SORT_ORDER
115+
else:
116+
_sort_order_id = self._reuse_or_create_sort_order_id()
117+
self._last_assigned_order_id = _sort_order_id
118+
return SortOrder(*self._fields, order_id=_sort_order_id)
104119

105120
def _commit(self) -> UpdatesAndRequirements:
106121
"""Apply the pending changes and commit."""
107122
new_sort_order = self._apply()
108123
requirements: Tuple[TableRequirement, ...] = ()
109124
updates: Tuple[TableUpdate, ...] = ()
110125

111-
if self._transaction.table_metadata.default_sort_order_id != new_sort_order.order_id:
126+
if self._transaction.table_metadata.default_sort_order_id != new_sort_order.order_id and self._transaction.table_metadata.sort_order_by_id(new_sort_order.order_id) is None:
112127
updates = (AddSortOrderUpdate(sort_order=new_sort_order), SetDefaultSortOrderUpdate(sort_order_id=-1))
113128
else:
114129
updates = (SetDefaultSortOrderUpdate(sort_order_id=new_sort_order.order_id),)

0 commit comments

Comments
 (0)