-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtutorial_05.py
More file actions
95 lines (78 loc) · 3.03 KB
/
tutorial_05.py
File metadata and controls
95 lines (78 loc) · 3.03 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
# %%
"""
The SQL we just ran changed the data in the database, and we might not have been happy about that.
Maybe we decide we made a mistake in our update logic, and now we want to see what rows didn't have a
_loaded_at column.
In Iceberg, time-travel is built-in, as all we have to do is keep track of the metadata for the files
"""
from aws_iceberg_demo.connections import get_trino_engine
import sqlalchemy as sa
import polars as pl
pl.config.Config.set_tbl_width_chars(-1)
engine = get_trino_engine()
# Unfortunately, the way Nessie is built - this section won't work locally
with engine.connect() as conn:
print(pl.read_database('SELECT * FROM store."events$snapshots"', conn))
#%%
"""
Iceberg versions the schemas, so if we timetravel back to before the schema update, we get an error
"""
with engine.connect() as conn:
sql = """
SELECT count(event_time) as num_events, _loaded_at
FROM store.events FOR VERSION AS OF 8433192130702135638
group by _loaded_at
"""
result = pl.read_database(sql, conn)
print(result)
#%%
"""We can also travel in time based on timestamps"""
with engine.connect() as conn:
sql = """
SELECT count(event_time) as num_events, _loaded_at
FROM store.events FOR TIMESTAMP AS OF TIMESTAMP '2025-02-26 20:33:50 UTC'
GROUP BY _loaded_at
"""
result = conn.execute(sa.text(sql))
print(pl.from_arrow(result.cursor.as_arrow()))
#%%
"""We can tag a given snapshot to give it a name, such as Q4 report"""
from aws_iceberg_demo.catalog import get_catalog
t = get_catalog().load_table("store.events")
t.manage_snapshots().create_tag(5463529947108187791, "q4_report").commit()
#%%
"""Now that we have a named tag, we can reference it in our FROM statement to be able to see data as it was back then
Tags can be configured with an expiration date e.g keep it for 7 days or in this case, keep it forever
"""
with engine.connect() as conn:
sql = f"""
SELECT count(event_time) as num_events, _loaded_at
FROM store.events FOR VERSION AS OF 'q4_report'
GROUP BY _loaded_at
"""
result = conn.execute(sa.text(sql))
print(pl.from_arrow(result.cursor.as_arrow()))
#%%
"""Let's add in some more data to demonstrate"""
import pathlib
import pyarrow.parquet as pq
data_files = [
"2020-Jan.parquet",
"2020-Feb.parquet",
"2020-Mar.parquet",
"2020-Apr.parquet"]
for data_file in data_files:
print(data_file)
df = pq.read_table(pathlib.Path("data/parquet/sampled") / data_file, schema=t.schema().as_arrow())
t.append(df)
#%%
with engine.connect() as conn:
r = conn.execute(sa.text("SELECT count(event_time) as num_events, date_trunc('month', event_time) FROM store.events group by 2"))
print(pl.from_arrow(r.cursor.as_arrow()))
#%%
with engine.connect() as conn:
r = conn.execute(
sa.text("SELECT count(event_time) as num_events, date_trunc('month', event_time) "
"FROM store.events FOR VERSION AS OF 'q4_report' "
"GROUP BY 2"))
print(pl.from_arrow(r.cursor.as_arrow()))