Skip to content

Commit 07cbf1b

Browse files
committed
move under optimize namespace
1 parent a62c8cf commit 07cbf1b

4 files changed

Lines changed: 128 additions & 77 deletions

File tree

pyiceberg/table/__init__.py

Lines changed: 10 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,12 @@
1616
# under the License.
1717
from __future__ import annotations
1818

19-
import contextlib
2019
import itertools
21-
import logging
2220
import os
2321
import uuid
2422
import warnings
2523
from abc import ABC, abstractmethod
2624
from dataclasses import dataclass
27-
from datetime import timedelta
2825
from functools import cached_property
2926
from itertools import chain
3027
from types import TracebackType
@@ -90,6 +87,7 @@
9087
from pyiceberg.table.name_mapping import (
9188
NameMapping,
9289
)
90+
from pyiceberg.table.optimize import OptimizeTable
9391
from pyiceberg.table.refs import SnapshotRef
9492
from pyiceberg.table.snapshots import (
9593
Snapshot,
@@ -153,8 +151,6 @@
153151

154152
from pyiceberg.catalog import Catalog
155153

156-
logger = logging.getLogger(__name__)
157-
158154
ALWAYS_TRUE = AlwaysTrue()
159155
DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE = "downcast-ns-timestamp-to-us-on-write"
160156

@@ -912,6 +908,15 @@ def inspect(self) -> InspectTable:
912908
"""
913909
return InspectTable(self)
914910

911+
@property
912+
def optimize(self) -> OptimizeTable:
913+
"""Return the OptimizeTable object to optimize.
914+
915+
Returns:
916+
OptimizeTable object based on this Table.
917+
"""
918+
return OptimizeTable(self)
919+
915920
def refresh(self) -> Table:
916921
"""Refresh the current table metadata.
917922
@@ -1376,28 +1381,6 @@ def to_polars(self) -> pl.LazyFrame:
13761381

13771382
return pl.scan_iceberg(self)
13781383

1379-
def delete_orphaned_files(self, older_than: timedelta = timedelta(days=3), dry_run: bool = False) -> None:
1380-
"""Delete orphaned files in the table."""
1381-
location = self.location()
1382-
orphaned_files = self.inspect.orphaned_files(location, older_than)
1383-
logger.info(f"Found {len(orphaned_files)} orphaned files at {location}!")
1384-
1385-
def _delete(file: str) -> None:
1386-
# don't error if the file doesn't exist
1387-
# still catch ctrl-c, etc.
1388-
with contextlib.suppress(Exception):
1389-
self.io.delete(file)
1390-
1391-
if orphaned_files:
1392-
if dry_run:
1393-
logger.info(f"(Dry Run) Deleted {len(orphaned_files)} orphaned files at {location}!")
1394-
else:
1395-
executor = ExecutorFactory.get_or_create()
1396-
deletes = executor.map(_delete, orphaned_files)
1397-
# exhaust
1398-
list(deletes)
1399-
logger.info(f"Deleted {len(orphaned_files)} orphaned files at {location}!")
1400-
14011384

14021385
class StaticTable(Table):
14031386
"""Load a table directly from a metadata file (i.e., without using a catalog)."""

pyiceberg/table/inspect.py

Lines changed: 1 addition & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,11 @@
1616
# under the License.
1717
from __future__ import annotations
1818

19-
from datetime import datetime, timedelta, timezone
19+
from datetime import datetime, timezone
2020
from functools import reduce
2121
from typing import TYPE_CHECKING, Any, Dict, Iterator, List, Optional, Set, Tuple, Union, cast
2222

2323
from pyiceberg.conversions import from_bytes
24-
from pyiceberg.io import _parse_location
2524
from pyiceberg.manifest import DataFile, DataFileContent, ManifestContent, PartitionFieldSummary
2625
from pyiceberg.partitioning import PartitionSpec
2726
from pyiceberg.table.snapshots import Snapshot, ancestors_of
@@ -687,40 +686,3 @@ def all_known_files(self) -> dict[str, set[str]]:
687686
_all_known_files["datafiles"] = reduce(set.union, files_by_snapshots, set())
688687

689688
return _all_known_files
690-
691-
def orphaned_files(self, location: str, older_than: timedelta = timedelta(days=3)) -> Set[str]:
692-
"""Get all the orphaned files in the table.
693-
694-
Args:
695-
location: The location to check for orphaned files.
696-
older_than: The time period to check for orphaned files. Defaults to 3 days.
697-
698-
Returns:
699-
A set of orphaned file paths.
700-
701-
"""
702-
try:
703-
import pyarrow as pa # noqa: F401
704-
except ModuleNotFoundError as e:
705-
raise ModuleNotFoundError("For deleting orphaned files PyArrow needs to be installed") from e
706-
707-
from pyarrow.fs import FileSelector, FileType
708-
709-
from pyiceberg.io.pyarrow import _fs_from_file_path
710-
711-
all_known_files = self.all_known_files()
712-
flat_known_files: set[str] = reduce(set.union, all_known_files.values(), set())
713-
714-
fs = _fs_from_file_path(self.tbl.io, location)
715-
716-
_, _, path = _parse_location(location)
717-
selector = FileSelector(path, recursive=True)
718-
# filter to just files as it may return directories, and filter on time
719-
as_of = datetime.now(timezone.utc) - older_than
720-
all_files = [
721-
f.path for f in fs.get_file_info(selector) if f.type == FileType.File and (as_of is None or (f.mtime < as_of))
722-
]
723-
724-
orphaned_files = set(all_files).difference(flat_known_files)
725-
726-
return orphaned_files

pyiceberg/table/optimize.py

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
from __future__ import annotations
18+
19+
import contextlib
20+
import logging
21+
from datetime import datetime, timedelta, timezone
22+
from functools import reduce
23+
from typing import TYPE_CHECKING, Set
24+
25+
from pyiceberg.io import _parse_location
26+
from pyiceberg.utils.concurrent import ExecutorFactory
27+
28+
logger = logging.getLogger(__name__)
29+
30+
31+
if TYPE_CHECKING:
32+
from pyiceberg.table import Table
33+
34+
35+
class OptimizeTable:
36+
tbl: Table
37+
38+
def __init__(self, tbl: Table) -> None:
39+
self.tbl = tbl
40+
41+
try:
42+
import pyarrow as pa # noqa
43+
except ModuleNotFoundError as e:
44+
raise ModuleNotFoundError("For metadata operations PyArrow needs to be installed") from e
45+
46+
def orphaned_files(self, location: str, older_than: timedelta = timedelta(days=3)) -> Set[str]:
47+
"""Get all files which are not referenced in any metadata files of an Iceberg table and can thus be considered "orphaned".
48+
49+
Args:
50+
location: The location to check for orphaned files.
51+
older_than: The time period to check for orphaned files. Defaults to 3 days.
52+
53+
Returns:
54+
A set of orphaned file paths.
55+
"""
56+
try:
57+
import pyarrow as pa # noqa: F401
58+
except ModuleNotFoundError as e:
59+
raise ModuleNotFoundError("For deleting orphaned files PyArrow needs to be installed") from e
60+
61+
from pyarrow.fs import FileSelector, FileType
62+
63+
from pyiceberg.io.pyarrow import _fs_from_file_path
64+
65+
all_known_files = self.tbl.inspect.all_known_files()
66+
flat_known_files: set[str] = reduce(set.union, all_known_files.values(), set())
67+
68+
fs = _fs_from_file_path(self.tbl.io, location)
69+
70+
_, _, path = _parse_location(location)
71+
selector = FileSelector(path, recursive=True)
72+
# filter to just files as it may return directories, and filter on time
73+
as_of = datetime.now(timezone.utc) - older_than
74+
all_files = [
75+
f.path for f in fs.get_file_info(selector) if f.type == FileType.File and (as_of is None or (f.mtime < as_of))
76+
]
77+
78+
orphaned_files = set(all_files).difference(flat_known_files)
79+
80+
return orphaned_files
81+
82+
def remove_orphaned_files(self, older_than: timedelta = timedelta(days=3), dry_run: bool = False) -> None:
83+
"""Remove files which are not referenced in any metadata files of an Iceberg table and can thus be considered "orphaned".
84+
85+
Args:
86+
older_than: The time period to check for orphaned files. Defaults to 3 days.
87+
dry_run: If True, only log the files that would be deleted. Defaults to False.
88+
"""
89+
location = self.tbl.location()
90+
orphaned_files = self.orphaned_files(location, older_than)
91+
logger.info(f"Found {len(orphaned_files)} orphaned files at {location}!")
92+
93+
def _delete(file: str) -> None:
94+
# don't error if the file doesn't exist
95+
# still catch ctrl-c, etc.
96+
with contextlib.suppress(Exception):
97+
self.tbl.io.delete(file)
98+
99+
if orphaned_files:
100+
if dry_run:
101+
logger.info(f"(Dry Run) Deleted {len(orphaned_files)} orphaned files at {location}!")
102+
else:
103+
executor = ExecutorFactory.get_or_create()
104+
deletes = executor.map(_delete, orphaned_files)
105+
# exhaust
106+
list(deletes)
107+
logger.info(f"Deleted {len(orphaned_files)} orphaned files at {location}!")
Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
import os
1818
from datetime import datetime, timedelta
1919
from pathlib import Path, PosixPath
20-
from unittest.mock import PropertyMock, patch
20+
from unittest.mock import patch
2121

2222
import pyarrow as pa
2323
import pytest
@@ -35,8 +35,8 @@ def catalog(tmp_path: PosixPath) -> InMemoryCatalog:
3535
return catalog
3636

3737

38-
def test_delete_orphaned_files(catalog: Catalog) -> None:
39-
identifier = "default.test_delete_orphaned_files"
38+
def test_remove_orphaned_files(catalog: Catalog) -> None:
39+
identifier = "default.test_remove_orphaned_files"
4040

4141
schema = Schema(
4242
NestedField(1, "city", StringType(), required=True),
@@ -69,17 +69,17 @@ def test_delete_orphaned_files(catalog: Catalog) -> None:
6969
assert orphaned_file.exists()
7070

7171
# assert no files deleted if dry run...
72-
tbl.delete_orphaned_files(dry_run=True)
72+
tbl.optimize.remove_orphaned_files(dry_run=True)
7373
assert orphaned_file.exists()
7474

7575
# should not delete because it was just created...
76-
tbl.delete_orphaned_files()
76+
tbl.optimize.remove_orphaned_files()
7777
assert orphaned_file.exists()
7878

7979
# modify creation date to be older than 3 days
8080
five_days_ago = (datetime.now() - timedelta(days=5)).timestamp()
8181
os.utime(orphaned_file, (five_days_ago, five_days_ago))
82-
tbl.delete_orphaned_files()
82+
tbl.optimize.remove_orphaned_files()
8383
assert not orphaned_file.exists()
8484

8585
# assert that all known files still exist...
@@ -89,8 +89,8 @@ def test_delete_orphaned_files(catalog: Catalog) -> None:
8989
assert Path(file).exists()
9090

9191

92-
def test_delete_orphaned_files_with_invalid_file_doesnt_error(catalog: Catalog) -> None:
93-
identifier = "default.test_delete_orphaned_files"
92+
def test_remove_orphaned_files_with_invalid_file_doesnt_error(catalog: Catalog) -> None:
93+
identifier = "default.test_remove_orphaned_files"
9494

9595
schema = Schema(
9696
NestedField(1, "city", StringType(), required=True),
@@ -118,8 +118,7 @@ def test_delete_orphaned_files_with_invalid_file_doesnt_error(catalog: Catalog)
118118
tbl.append(df)
119119

120120
file_that_does_not_exist = "foo/bar.baz"
121-
with patch.object(type(tbl), "inspect", new_callable=PropertyMock) as mock_inspect:
122-
mock_inspect.return_value.orphaned_files = lambda location, older_than: {file_that_does_not_exist}
121+
with patch.object(type(tbl.optimize), "orphaned_files", return_value={file_that_does_not_exist}):
123122
with patch.object(tbl.io, "delete", wraps=tbl.io.delete) as mock_delete:
124-
tbl.delete_orphaned_files()
123+
tbl.optimize.remove_orphaned_files(timedelta(days=3))
125124
mock_delete.assert_called_with(file_that_does_not_exist)

0 commit comments

Comments
 (0)