Skip to content

Commit bb4ec89

Browse files
committed
docs: improve Write to a table section (#1008)
- Add dedicated subsections for append, overwrite, delete, dynamic partition overwrite, upsert, and Transaction API - Document overwrite_filter with worked example and note on accepted expression types - Document branch parameter on append and overwrite - Add case_sensitive note to overwrite and delete - Add merge-on-read warning to delete (falls back to copy-on-write) - Add Transaction API section showing atomic data + metadata changes - Fix transaction rollback language: catalog commit is aborted but object-storage files are not automatically cleaned up - Remove redundant top-level Snapshot properties section
1 parent 721c5aa commit bb4ec89

1 file changed

Lines changed: 109 additions & 85 deletions

File tree

mkdocs/docs/api.md

Lines changed: 109 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -272,10 +272,15 @@ catalog.purge_table("docs_example.bids")
272272

273273
## Write to a table
274274

275-
Reading and writing is being done using [Apache Arrow](https://arrow.apache.org/). Arrow is an in-memory columnar format for fast data interchange and in-memory analytics. Let's consider the following Arrow Table:
275+
PyIceberg supports several write modes: [append](#append), [overwrite](#overwrite), [delete](#delete), [dynamic partition overwrite](#dynamic-partition-overwrite), and [upsert](#upsert). All writes use [Apache Arrow](https://arrow.apache.org/) as the in-memory format. Writes can be issued directly on the `Table` object or grouped together using the [Transaction API](#transaction-api).
276+
277+
To set up a table for the examples below:
276278

277279
```python
278280
import pyarrow as pa
281+
from pyiceberg.catalog import load_catalog
282+
283+
catalog = load_catalog("default")
279284
280285
df = pa.Table.from_pylist(
281286
[
@@ -285,19 +290,11 @@ df = pa.Table.from_pylist(
285290
{"city": "Paris", "lat": 48.864716, "long": 2.349014},
286291
],
287292
)
288-
```
289-
290-
Next, create a table using the Arrow schema:
291-
292-
```python
293-
from pyiceberg.catalog import load_catalog
294-
295-
catalog = load_catalog("default")
296293
297294
tbl = catalog.create_table("default.cities", schema=df.schema)
298295
```
299296

300-
Next, write the data to the table. Both `append` and `overwrite` produce the same result, since the table is empty on creation:
297+
### Append
301298

302299
<!-- prettier-ignore-start -->
303300

@@ -306,15 +303,13 @@ Next, write the data to the table. Both `append` and `overwrite` produce the sam
306303

307304
<!-- prettier-ignore-end -->
308305

306+
Use `append` to add new rows to an existing table without modifying any existing data:
307+
309308
```python
310309
tbl.append(df)
311-
312-
# or
313-
314-
tbl.overwrite(df)
315310
```
316311

317-
Now, the data is written to the table, and the table can be read using `tbl.scan().to_arrow()`:
312+
After the first append, `tbl.scan().to_arrow()` returns:
318313

319314
```python
320315
pyarrow.Table
@@ -327,15 +322,15 @@ lat: [[52.371807,37.773972,53.11254,48.864716]]
327322
long: [[4.896029,-122.431297,6.0989,2.349014]]
328323
```
329324

330-
If we want to add more data, we can use `.append()` again:
325+
Each call to `append` produces a new [Parquet file](https://parquet.apache.org/). Calling `append` a second time adds another batch of rows:
331326

332327
```python
333328
tbl.append(pa.Table.from_pylist(
334329
[{"city": "Groningen", "lat": 53.21917, "long": 6.56667}],
335330
))
336331
```
337332

338-
When reading the table `tbl.scan().to_arrow()` you can see that `Groningen` is now also part of the table:
333+
The nested lists in `tbl.scan().to_arrow()` reflect the separate Arrow buffers from each write:
339334

340335
```python
341336
pyarrow.Table
@@ -348,98 +343,113 @@ lat: [[52.371807,37.773972,53.11254,48.864716],[53.21917]]
348343
long: [[4.896029,-122.431297,6.0989,2.349014],[6.56667]]
349344
```
350345

351-
The nested lists indicate the different Arrow buffers. Each of the writes produce a [Parquet file](https://parquet.apache.org/) where each [row group](https://parquet.apache.org/docs/concepts/) translates into an Arrow buffer. In the case where the table is large, PyIceberg also allows the option to stream the buffers using the Arrow [RecordBatchReader](https://arrow.apache.org/docs/python/generated/pyarrow.RecordBatchReader.html), avoiding pulling everything into memory right away:
346+
To avoid type inconsistencies, convert the Iceberg table schema to Arrow before writing:
352347

353348
```python
354-
for buf in tbl.scan().to_arrow_batch_reader():
355-
print(f"Buffer contains {len(buf)} rows")
349+
df = pa.Table.from_pylist(
350+
[{"city": "Groningen", "lat": 53.21917, "long": 6.56667}], schema=tbl.schema().as_arrow()
351+
)
352+
353+
tbl.append(df)
356354
```
357355

358-
To avoid any type inconsistencies during writing, you can convert the Iceberg table schema to Arrow:
356+
Optionally, you can attach custom properties to the snapshot created by `append`, or target a specific branch:
359357

360358
```python
361-
df = pa.Table.from_pylist(
362-
[{"city": "Groningen", "lat": 53.21917, "long": 6.56667}], schema=table.schema().as_arrow()
363-
)
359+
tbl.append(df, snapshot_properties={"owner": "etl-job", "run_id": "abc123"})
364360
365-
tbl.append(df)
361+
# Write to a branch instead of main
362+
tbl.append(df, branch="staging")
366363
```
367364

368-
You can delete some of the data from the table by calling `tbl.delete()` with a desired `delete_filter`. This will use the Iceberg metadata to only open up the Parquet files that contain relevant information.
365+
### Overwrite
366+
367+
`overwrite` replaces data in the table with new data. When called without an `overwrite_filter`, it behaves like a full table replacement — existing data is deleted and the new data is written. When the table is empty, `overwrite` and `append` produce the same result.
369368

370369
```python
371-
tbl.delete(delete_filter="city == 'Paris'")
370+
tbl.overwrite(df)
372371
```
373372

374-
In the above example, any records where the city field value equals to `Paris` will be deleted. Running `tbl.scan().to_arrow()` will now yield:
373+
#### Partial overwrite with `overwrite_filter`
374+
375+
Pass an `overwrite_filter` to delete only the rows that match the predicate before appending the new data. This is useful for replacing a specific subset of rows.
376+
377+
For example, to replace the record for `Paris` with a record for `New York`:
378+
379+
```python
380+
from pyiceberg.expressions import EqualTo
381+
382+
df_new = pa.Table.from_pylist(
383+
[{"city": "New York", "lat": 40.7128, "long": 74.0060}]
384+
)
385+
tbl.overwrite(df_new, overwrite_filter=EqualTo("city", "Paris"))
386+
```
387+
388+
After the overwrite, `tbl.scan().to_arrow()` yields:
375389

376390
```python
377391
pyarrow.Table
378-
city: string
392+
city: large_string
379393
lat: double
380394
long: double
381395
----
382-
city: [["Amsterdam","San Francisco","Drachten"],["Groningen"]]
383-
lat: [[52.371807,37.773972,53.11254],[53.21917]]
384-
long: [[4.896029,-122.431297,6.0989],[6.56667]]
396+
city: [["New York"],["Amsterdam","San Francisco","Drachten"]]
397+
lat: [[40.7128],[52.371807,37.773972,53.11254]]
398+
long: [[74.006],[4.896029,-122.431297,6.0989]]
385399
```
386400

387-
In the case of `tbl.delete(delete_filter="city == 'Groningen'")`, the whole Parquet file will be dropped without checking it contents, since from the Iceberg metadata PyIceberg can derive that all the content in the file matches the predicate.
401+
The `overwrite_filter` accepts both expression objects (e.g., `EqualTo`, `GreaterThan`) and SQL-style string predicates (e.g., `"city == 'Paris'"`). Matching is case-sensitive by default; pass `case_sensitive=False` to change this.
388402

389-
### Partial overwrites
390-
391-
When using the `overwrite` API, you can use an `overwrite_filter` to delete data that matches the filter before appending new data into the table. For example, consider the following Iceberg table:
403+
Optionally, you can also set snapshot properties or target a branch:
392404

393405
```python
394-
import pyarrow as pa
395-
df = pa.Table.from_pylist(
396-
[
397-
{"city": "Amsterdam", "lat": 52.371807, "long": 4.896029},
398-
{"city": "San Francisco", "lat": 37.773972, "long": -122.431297},
399-
{"city": "Drachten", "lat": 53.11254, "long": 6.0989},
400-
{"city": "Paris", "lat": 48.864716, "long": 2.349014},
401-
],
402-
)
406+
tbl.overwrite(df_new, overwrite_filter=EqualTo("city", "Paris"), snapshot_properties={"owner": "etl-job"})
403407
404-
from pyiceberg.catalog import load_catalog
405-
catalog = load_catalog("default")
408+
# Write to a branch instead of main
409+
tbl.overwrite(df_new, overwrite_filter=EqualTo("city", "Paris"), branch="staging")
410+
```
406411

407-
tbl = catalog.create_table("default.cities", schema=df.schema)
412+
### Delete
408413

409-
tbl.append(df)
410-
```
414+
Use `delete` to remove rows matching a predicate without writing new data. PyIceberg uses Iceberg metadata to prune which Parquet files need to be opened, so only relevant files are read. The filter is case-sensitive by default; pass `case_sensitive=False` to change this.
415+
416+
<!-- prettier-ignore-start -->
411417

412-
You can overwrite the record of `Paris` with a record of `New York`:
418+
!!! warning "Merge-on-read not yet supported"
419+
If the table property `write.delete.mode` is set to `merge-on-read`, PyIceberg will fall back to copy-on-write and emit a warning. All deletes currently rewrite Parquet files.
420+
421+
<!-- prettier-ignore-end -->
413422

414423
```python
415-
from pyiceberg.expressions import EqualTo
416-
df = pa.Table.from_pylist(
417-
[
418-
{"city": "New York", "lat": 40.7128, "long": 74.0060},
419-
]
420-
)
421-
tbl.overwrite(df, overwrite_filter=EqualTo('city', "Paris"))
424+
tbl.delete(delete_filter="city == 'Paris'")
422425
```
423426

424-
This produces the following result with `tbl.scan().to_arrow()`:
427+
Any rows where `city` equals `Paris` are removed. Running `tbl.scan().to_arrow()` afterwards yields:
425428

426429
```python
427430
pyarrow.Table
428-
city: large_string
431+
city: string
429432
lat: double
430433
long: double
431434
----
432-
city: [["New York"],["Amsterdam","San Francisco","Drachten"]]
433-
lat: [[40.7128],[52.371807,37.773972,53.11254]]
434-
long: [[74.006],[4.896029,-122.431297,6.0989]]
435+
city: [["Amsterdam","San Francisco","Drachten"],["Groningen"]]
436+
lat: [[52.371807,37.773972,53.11254],[53.21917]]
437+
long: [[4.896029,-122.431297,6.0989],[6.56667]]
435438
```
436439

437-
If the PyIceberg table is partitioned, you can use `tbl.dynamic_partition_overwrite(df)` to replace the existing partitions with new ones provided in the dataframe. The partitions to be replaced are detected automatically from the provided arrow table.
438-
For example, with an iceberg table with a partition specified on `"city"` field:
440+
When the predicate matches all rows in a Parquet file (e.g., `tbl.delete(delete_filter="city == 'Groningen'")`), PyIceberg drops the entire file without scanning its contents.
441+
442+
### Dynamic Partition Overwrite
443+
444+
For partitioned tables, `dynamic_partition_overwrite` replaces only the partitions present in the provided Arrow table. The partitions to overwrite are detected automatically — you do not need to specify them explicitly.
445+
446+
First, create a partitioned table:
439447

440448
```python
441449
from pyiceberg.schema import Schema
442450
from pyiceberg.types import DoubleType, NestedField, StringType
451+
from pyiceberg.partitioning import PartitionSpec, PartitionField
452+
from pyiceberg.transforms import IdentityTransform
443453
444454
schema = Schema(
445455
NestedField(1, "city", StringType(), required=False),
@@ -454,23 +464,21 @@ tbl = catalog.create_table(
454464
)
455465
```
456466

457-
And we want to overwrite the data for the partition of `"Paris"`:
467+
Write some initial data:
458468

459469
```python
460-
import pyarrow as pa
461-
462470
df = pa.Table.from_pylist(
463471
[
464472
{"city": "Amsterdam", "lat": 52.371807, "long": 4.896029},
465473
{"city": "San Francisco", "lat": 37.773972, "long": -122.431297},
466474
{"city": "Drachten", "lat": 53.11254, "long": 6.0989},
467-
{"city": "Paris", "lat": -48.864716, "long": -2.349014},
475+
{"city": "Paris", "lat": -48.864716, "long": -2.349014}, # incorrect coordinates
468476
],
469477
)
470478
tbl.append(df)
471479
```
472480

473-
Then we can call `dynamic_partition_overwrite` with this arrow table:
481+
To correct only the `Paris` partition:
474482

475483
```python
476484
df_corrected = pa.Table.from_pylist([
@@ -479,7 +487,7 @@ df_corrected = pa.Table.from_pylist([
479487
tbl.dynamic_partition_overwrite(df_corrected)
480488
```
481489

482-
This produces the following result with `tbl.scan().to_arrow()`:
490+
Only the `Paris` partition is replaced. All other partitions remain unchanged. `tbl.scan().to_arrow()` now yields:
483491

484492
```python
485493
pyarrow.Table
@@ -492,6 +500,35 @@ lat: [[48.864716],[52.371807],[53.11254],[37.773972]]
492500
long: [[2.349014],[4.896029],[6.0989],[-122.431297]]
493501
```
494502

503+
### Transaction API
504+
505+
All write operations can also be issued as part of a transaction, which lets you combine multiple mutations — including schema changes, property updates, and data writes — into a single atomic commit.
506+
507+
```python
508+
with tbl.transaction() as txn:
509+
txn.append(df)
510+
```
511+
512+
You can combine multiple write operations in one transaction:
513+
514+
```python
515+
with tbl.transaction() as txn:
516+
txn.delete("city == 'Paris'")
517+
txn.append(pa.Table.from_pylist([{"city": "New York", "lat": 40.7128, "long": 74.0060}]))
518+
```
519+
520+
You can also mix data writes with metadata changes in the same transaction:
521+
522+
```python
523+
with tbl.transaction() as txn:
524+
txn.append(df)
525+
with txn.update_schema() as update_schema:
526+
update_schema.add_column("population", "long")
527+
txn.set_properties(owner="data-team")
528+
```
529+
530+
If an exception is raised inside the `with` block, no snapshot is committed to the catalog. Note that data files already written to object storage are not automatically cleaned up in that case.
531+
495532
### Upsert
496533

497534
PyIceberg supports upsert operations, meaning that it is able to merge an Arrow table into an Iceberg table. Rows are considered the same based on the [identifier field](https://iceberg.apache.org/spec/?column-projection#identifier-field-ids). If a row is already in the table, it will update that row. If a row cannot be found, it will insert that new row.
@@ -553,6 +590,7 @@ upd = tbl.upsert(df)
553590
554591
assert upd.rows_updated == 1
555592
assert upd.rows_inserted == 1
593+
# Paris was already up-to-date; PyIceberg skips it silently
556594
```
557595

558596
PyIceberg will automatically detect which rows need to be updated, inserted or can simply be ignored.
@@ -1370,20 +1408,6 @@ table = table.transaction().remove_properties("abc").commit_transaction()
13701408
assert table.properties == {}
13711409
```
13721410

1373-
## Snapshot properties
1374-
1375-
Optionally, Snapshot properties can be set while writing to a table using `append` or `overwrite` API:
1376-
1377-
```python
1378-
tbl.append(df, snapshot_properties={"abc": "def"})
1379-
1380-
# or
1381-
1382-
tbl.overwrite(df, snapshot_properties={"abc": "def"})
1383-
1384-
assert tbl.metadata.snapshots[-1].summary["abc"] == "def"
1385-
```
1386-
13871411
## Snapshot Management
13881412

13891413
Manage snapshots with operations through the `Table` API:

0 commit comments

Comments
 (0)