Skip to content

Commit e4463df

Browse files
committed
Fixed linter and code errors
1 parent 53a7f84 commit e4463df

2 files changed

Lines changed: 34 additions & 15 deletions

File tree

pyiceberg/table/__init__.py

Lines changed: 32 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -398,7 +398,7 @@ def _build_partition_predicate(self, partition_records: Set[Record]) -> BooleanE
398398
expr = Or(expr, match_partition_expression)
399399
return expr
400400

401-
def _append_snapshot_producer(self, snapshot_properties: Dict[str, str],branch:str) -> _FastAppendFiles:
401+
def _append_snapshot_producer(self, snapshot_properties: Dict[str, str], branch: str) -> _FastAppendFiles:
402402
"""Determine the append type based on table properties.
403403
404404
Args:
@@ -411,7 +411,7 @@ def _append_snapshot_producer(self, snapshot_properties: Dict[str, str],branch:s
411411
TableProperties.MANIFEST_MERGE_ENABLED,
412412
TableProperties.MANIFEST_MERGE_ENABLED_DEFAULT,
413413
)
414-
update_snapshot = self.update_snapshot(snapshot_properties=snapshot_properties,branch=branch)
414+
update_snapshot = self.update_snapshot(snapshot_properties=snapshot_properties, branch=branch)
415415
return update_snapshot.merge_append() if manifest_merge_enabled else update_snapshot.fast_append()
416416

417417
def update_schema(self, allow_incompatible_changes: bool = False, case_sensitive: bool = True) -> UpdateSchema:
@@ -478,7 +478,7 @@ def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT,
478478
self.table_metadata.schema(), provided_schema=df.schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us
479479
)
480480

481-
with self._append_snapshot_producer(snapshot_properties,branch=branch) as append_files:
481+
with self._append_snapshot_producer(snapshot_properties, branch=branch) as append_files:
482482
# skip writing data files if the dataframe is empty
483483
if df.shape[0] > 0:
484484
data_files = list(
@@ -489,7 +489,9 @@ def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT,
489489
for data_file in data_files:
490490
append_files.append_data_file(data_file)
491491

492-
def dynamic_partition_overwrite(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None:
492+
def dynamic_partition_overwrite(
493+
self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT, branch: str = MAIN_BRANCH
494+
) -> None:
493495
"""
494496
Shorthand for overwriting existing partitions with a PyArrow table.
495497
@@ -500,6 +502,7 @@ def dynamic_partition_overwrite(self, df: pa.Table, snapshot_properties: Dict[st
500502
Args:
501503
df: The Arrow dataframe that will be used to overwrite the table
502504
snapshot_properties: Custom properties to be added to the snapshot summary
505+
branch: Branch Reference to run the dynamic partition overwrite operation
503506
"""
504507
try:
505508
import pyarrow as pa
@@ -540,7 +543,7 @@ def dynamic_partition_overwrite(self, df: pa.Table, snapshot_properties: Dict[st
540543
delete_filter = self._build_partition_predicate(partition_records=partitions_to_overwrite)
541544
self.delete(delete_filter=delete_filter, snapshot_properties=snapshot_properties)
542545

543-
with self._append_snapshot_producer(snapshot_properties) as append_files:
546+
with self._append_snapshot_producer(snapshot_properties, branch=branch) as append_files:
544547
append_files.commit_uuid = append_snapshot_commit_uuid
545548
for data_file in data_files:
546549
append_files.append_data_file(data_file)
@@ -593,9 +596,14 @@ def overwrite(
593596

594597
if overwrite_filter != AlwaysFalse():
595598
# Only delete when the filter is != AlwaysFalse
596-
self.delete(delete_filter=overwrite_filter, case_sensitive=case_sensitive, snapshot_properties=snapshot_properties, branch=branch)
599+
self.delete(
600+
delete_filter=overwrite_filter,
601+
case_sensitive=case_sensitive,
602+
snapshot_properties=snapshot_properties,
603+
branch=branch,
604+
)
597605

598-
with self._append_snapshot_producer(snapshot_properties,branch=branch) as append_files:
606+
with self._append_snapshot_producer(snapshot_properties, branch=branch) as append_files:
599607
# skip writing data files if the dataframe is empty
600608
if df.shape[0] > 0:
601609
data_files = _dataframe_to_data_files(
@@ -640,7 +648,7 @@ def delete(
640648
if isinstance(delete_filter, str):
641649
delete_filter = _parse_row_filter(delete_filter)
642650

643-
with self.update_snapshot(snapshot_properties=snapshot_properties,branch=branch).delete() as delete_snapshot:
651+
with self.update_snapshot(snapshot_properties=snapshot_properties, branch=branch).delete() as delete_snapshot:
644652
delete_snapshot.delete_by_predicate(delete_filter, case_sensitive)
645653

646654
# Check if there are any files that require an actual rewrite of a data file
@@ -690,7 +698,9 @@ def delete(
690698
)
691699

692700
if len(replaced_files) > 0:
693-
with self.update_snapshot(snapshot_properties=snapshot_properties,branch=branch).overwrite() as overwrite_snapshot:
701+
with self.update_snapshot(
702+
snapshot_properties=snapshot_properties, branch=branch
703+
).overwrite() as overwrite_snapshot:
694704
overwrite_snapshot.commit_uuid = commit_uuid
695705
for original_data_file, replaced_data_files in replaced_files:
696706
overwrite_snapshot.delete_data_file(original_data_file)
@@ -1301,16 +1311,19 @@ def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT,
13011311
with self.transaction() as tx:
13021312
tx.append(df=df, snapshot_properties=snapshot_properties, branch=branch)
13031313

1304-
def dynamic_partition_overwrite(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None:
1314+
def dynamic_partition_overwrite(
1315+
self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT, branch: str = MAIN_BRANCH
1316+
) -> None:
13051317
"""Shorthand for dynamic overwriting the table with a PyArrow table.
13061318
13071319
Old partitions are auto detected and replaced with data files created for input arrow table.
13081320
Args:
13091321
df: The Arrow dataframe that will be used to overwrite the table
13101322
snapshot_properties: Custom properties to be added to the snapshot summary
1323+
branch: Branch Reference to run the dynamic partition overwrite operation
13111324
"""
13121325
with self.transaction() as tx:
1313-
tx.dynamic_partition_overwrite(df=df, snapshot_properties=snapshot_properties)
1326+
tx.dynamic_partition_overwrite(df=df, snapshot_properties=snapshot_properties, branch=branch)
13141327

13151328
def overwrite(
13161329
self,
@@ -1339,7 +1352,11 @@ def overwrite(
13391352
"""
13401353
with self.transaction() as tx:
13411354
tx.overwrite(
1342-
df=df, overwrite_filter=overwrite_filter, case_sensitive=case_sensitive, snapshot_properties=snapshot_properties, branch=branch
1355+
df=df,
1356+
overwrite_filter=overwrite_filter,
1357+
case_sensitive=case_sensitive,
1358+
snapshot_properties=snapshot_properties,
1359+
branch=branch,
13431360
)
13441361

13451362
def delete(
@@ -1359,7 +1376,9 @@ def delete(
13591376
branch: Branch Reference to run the delete operation
13601377
"""
13611378
with self.transaction() as tx:
1362-
tx.delete(delete_filter=delete_filter, case_sensitive=case_sensitive, snapshot_properties=snapshot_properties, branch=branch)
1379+
tx.delete(
1380+
delete_filter=delete_filter, case_sensitive=case_sensitive, snapshot_properties=snapshot_properties, branch=branch
1381+
)
13631382

13641383
def add_files(
13651384
self, file_paths: List[str], snapshot_properties: Dict[str, str] = EMPTY_DICT, check_duplicate_files: bool = True

tests/integration/test_writes/test_writes.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1945,7 +1945,7 @@ def test_intertwined_branch_writes(session_catalog: Catalog, arrow_table_with_nu
19451945

19461946
@pytest.mark.integration
19471947
def test_branch_spark_write_py_read(session_catalog: Catalog, spark: SparkSession, arrow_table_with_null: pa.Table) -> None:
1948-
# Intialize table with branch
1948+
# Initialize table with branch
19491949
identifier = "default.test_branch_spark_write_py_read"
19501950
tbl = _create_table(session_catalog, identifier, {"format-version": "2"}, [arrow_table_with_null])
19511951
branch = "existing_spark_branch"
@@ -1971,7 +1971,7 @@ def test_branch_spark_write_py_read(session_catalog: Catalog, spark: SparkSessio
19711971

19721972
@pytest.mark.integration
19731973
def test_branch_py_write_spark_read(session_catalog: Catalog, spark: SparkSession, arrow_table_with_null: pa.Table) -> None:
1974-
# Intialize table with branch
1974+
# Initialize table with branch
19751975
identifier = "default.test_branch_py_write_spark_read"
19761976
tbl = _create_table(session_catalog, identifier, {"format-version": "2"}, [arrow_table_with_null])
19771977
branch = "existing_py_branch"

0 commit comments

Comments
 (0)