Skip to content

Commit 65fe36d

Browse files
committed
Filter rows to insert on each iteration instead of keeping a list of all filter expressions. Prevents memory pressure due to large filters
1 parent 5bdb0b8 commit 65fe36d

1 file changed

Lines changed: 3 additions & 9 deletions

File tree

pyiceberg/table/__init__.py

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,7 @@
1616
# under the License.
1717
from __future__ import annotations
1818

19-
import functools
2019
import itertools
21-
import operator
2220
import os
2321
import uuid
2422
import warnings
@@ -785,7 +783,7 @@ def upsert(
785783

786784
batches_to_overwrite = []
787785
overwrite_predicates = []
788-
insert_filters = []
786+
rows_to_insert = df
789787

790788
for batch in matched_iceberg_record_batches:
791789
rows = pa.Table.from_batches([batch])
@@ -808,7 +806,8 @@ def upsert(
808806
expr_match_bound = bind(self.table_metadata.schema(), expr_match, case_sensitive=case_sensitive)
809807
expr_match_arrow = expression_to_pyarrow(expr_match_bound)
810808

811-
insert_filters.append(~expr_match_arrow)
809+
# Filter rows per batch.
810+
rows_to_insert = rows_to_insert.filter(~expr_match_arrow)
812811

813812
update_row_cnt = 0
814813
insert_row_cnt = 0
@@ -822,11 +821,6 @@ def upsert(
822821
)
823822

824823
if when_not_matched_insert_all:
825-
if insert_filters:
826-
rows_to_insert = df.filter(functools.reduce(operator.and_, insert_filters))
827-
else:
828-
rows_to_insert = df
829-
830824
insert_row_cnt = len(rows_to_insert)
831825
if rows_to_insert:
832826
self.append(rows_to_insert)

0 commit comments

Comments
 (0)