diff --git a/.github/workflows/benchmark.yml b/.github/workflows/benchmark.yml new file mode 100644 index 0000000..3364965 --- /dev/null +++ b/.github/workflows/benchmark.yml @@ -0,0 +1,58 @@ +name: Performance Benchmark + +on: + push: + branches: [ '**' ] + pull_request: + branches: [ 'main' ] + workflow_dispatch: + +permissions: + contents: write + +jobs: + benchmark: + name: RATS Benchmark (${{ matrix.os }}, ${{ matrix.ref_name }}) + strategy: + fail-fast: false + matrix: + os: [ubuntu-latest, macos-latest, windows-latest] + ref: ["${{ github.sha }}", "5eaa1c6a82eb0a316ff2a6911e0ef7379add0bad"] + include: + - ref: "${{ github.sha }}" + ref_name: "Current HEAD" + - ref: "5eaa1c6a82eb0a316ff2a6911e0ef7379add0bad" + ref_name: "5eaa1c6 (Joshua V1)" + runs-on: ${{ matrix.os }} + defaults: + run: + shell: bash + steps: + - name: Checkout Code + uses: actions/checkout@v4 + with: + ref: ${{ github.sha }} + fetch-depth: 0 + + - name: Overlay src/ from Comparison Ref + if: matrix.ref != github.sha + run: | + git fetch origin ${{ matrix.ref }} --depth=1 + git checkout FETCH_HEAD -- src/ + + - name: Set up Python + uses: actions/setup-python@v4 + with: + python-version: '3.10' + + - name: Install Dependencies + run: | + python -m pip install --upgrade pip + python -m pip install -e .[test] + + - name: Run Benchmark + run: | + echo "## RATS Performance Benchmarks (${{ matrix.os }})" >> $GITHUB_STEP_SUMMARY + echo "Commit: ${{ matrix.ref_name }} (${{ matrix.ref }})" >> $GITHUB_STEP_SUMMARY + echo "Dataset: Klein Bottle (4D)" >> $GITHUB_STEP_SUMMARY + python tests/scripts/benchmark.py >> $GITHUB_STEP_SUMMARY diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 34c61d6..fa5db79 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -29,8 +29,8 @@ jobs: - name: Install Dependencies run: | - pip install --upgrade pip - pip install -e .[test] + python -m pip install --upgrade pip + python -m pip install -e .[test] - name: Set Fast Mode Flag id: set-flag diff --git a/.gitignore b/.gitignore index b9017f9..1cfc639 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,6 @@ src/pyRATS/__pycache__/ src/pyRATS.egg-info +tests/data .vscode .DS_Store __pycache__ \ No newline at end of file diff --git a/README.md b/README.md index 064fe03..18ff892 100644 --- a/README.md +++ b/README.md @@ -33,7 +33,7 @@ import datasets, vis X, labels, _ = datasets.Datasets().kleinbottle4d(n=5000) # create a RATS object projecting the data to 2d while tearing the manifold -model = rats.RATS(d=2, k=28, eta_min=5, to_tear=True) +model = rats.RATS(n_components=2, n_neighbors=28, min_cluster_size=5, tear=True) y = model.fit_transform(X) # compute the gluing instructions along the tear @@ -61,6 +61,15 @@ python -m http.server 8000 ``` and opening http://localhost:8000 in your browser. +# Memory Management +`pyRATS` implements dynamic chunking to prevent memory spikes on large or high-dimensional datasets. By default, it attempts to use up to 75% of available system RAM. This measurement can fail and defaults to 4GB otherwise. If you need to explicitly set memory usage, set the `PYRATS_MEMORY_LIMIT` environment variable (in bytes): + +```bash +# Set pyRATS memory usage to 40GB +export PYRATS_MEMORY_LIMIT=42949672960 +python your_script.py +``` + Citation ---------- diff --git a/examples/__init__.py b/examples/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/examples/barbell_example.ipynb b/examples/barbell_example.ipynb index 910cddb..7a967ce 100644 --- a/examples/barbell_example.ipynb +++ b/examples/barbell_example.ipynb @@ -58,14 +58,14 @@ "metadata": {}, "outputs": [], "source": [ - "buml_obj = rats.RATS(\n", - " d=2,\n", - " k=28,\n", - " eta_min=5,\n", - " to_tear=True,\n", + "model = rats.RATS(\n", + " n_components=2,\n", + " n_neighbors=28,\n", + " min_cluster_size=5,\n", + " tear=True,\n", ")\n", "\n", - "y = buml_obj.fit_transform(X=X)\n", + "y = model.fit_transform(X=X)\n", "y" ] }, @@ -85,7 +85,7 @@ "outputs": [], "source": [ "tear_color_eig_inds = [0,1,2] # interior has green color so assigned smallest value of i to g-channel\n", - "color_of_pts_on_tear = buml_obj.compute_color_of_pts_on_tear(\n", + "color_of_pts_on_tear = model.compute_color_of_pts_on_tear(\n", " y,\n", " tear_color_eig_inds\n", ")\n", @@ -129,4 +129,4 @@ }, "nbformat": 4, "nbformat_minor": 5 -} +} \ No newline at end of file diff --git a/examples/curvedtorus_example.ipynb b/examples/curvedtorus_example.ipynb index ece11d1..eb2a0b3 100644 --- a/examples/curvedtorus_example.ipynb +++ b/examples/curvedtorus_example.ipynb @@ -58,17 +58,17 @@ "metadata": {}, "outputs": [], "source": [ - "buml_obj = rats.RATS(\n", - " d=2,\n", - " k=21,\n", - " eta_min=5,\n", + "model = rats.RATS(\n", + " n_components=2,\n", + " n_neighbors=21,\n", + " min_cluster_size=5,\n", " nu=3,\n", - " max_iter=40,\n", - " to_tear=True,\n", - " cost_fn_name='distortion'\n", + " n_iter=40,\n", + " tear=True,\n", + " cost_function='distortion'\n", ")\n", "\n", - "y = buml_obj.fit_transform(X=X)\n", + "y = model.fit_transform(X=X)\n", "y" ] }, @@ -88,7 +88,7 @@ "outputs": [], "source": [ "tear_color_eig_inds = [6, 1, 4] # interior has green color so assigned smallest value of i to g-channel\n", - "color_of_pts_on_tear = buml_obj.compute_color_of_pts_on_tear(\n", + "color_of_pts_on_tear = model.compute_color_of_pts_on_tear(\n", " y,\n", " tear_color_eig_inds\n", ")\n", @@ -132,4 +132,4 @@ }, "nbformat": 4, "nbformat_minor": 5 -} +} \ No newline at end of file diff --git a/examples/kleinbottle_example.ipynb b/examples/kleinbottle_example.ipynb index 8723479..993c57b 100644 --- a/examples/kleinbottle_example.ipynb +++ b/examples/kleinbottle_example.ipynb @@ -7,7 +7,7 @@ "metadata": {}, "outputs": [], "source": [ - "%pip install git+https://github.com/Mishne-Lab/pyRATS\n", + "# %pip install git+https://github.com/Mishne-Lab/pyRATS\n", "# if not already installed on the system:\n", "# %pip install pandas imageio seaborn matplotlib" ] @@ -19,9 +19,12 @@ "metadata": {}, "outputs": [], "source": [ + "import sys\n", + "sys.path.insert(0, '../src/')\n", + "\n", "from matplotlib import pyplot as plt\n", "\n", - "from pyRATS import rats\n", + "from pyRATS import rats, global_distortion\n", "import datasets, vis" ] }, @@ -58,17 +61,36 @@ "metadata": {}, "outputs": [], "source": [ - "buml_obj = rats.RATS(\n", - " d=2,\n", - " k=28,\n", - " eta_min=5,\n", - " to_tear=True,\n", + "model = rats.RATS(\n", + " n_components=2,\n", + " n_neighbors=28,\n", + " min_cluster_size=5,\n", + " tear=True,\n", ")\n", "\n", - "y = buml_obj.fit_transform(X=X)\n", + "y = model.fit_transform(X=X)\n", "y" ] }, + { + "cell_type": "code", + "execution_count": null, + "id": "0fdf51f9", + "metadata": {}, + "outputs": [], + "source": [ + "n_nbrs = global_distortion.get_n_nbrs_for_distortion_analysis(X)\n", + "\n", + "global_distortion.compute_global_distortion(\n", + " X,\n", + " y,\n", + " n_nbrs,\n", + " buml_obj,\n", + " metric=\"euclidean\",\n", + " max_crossings=20,\n", + ")" + ] + }, { "cell_type": "code", "execution_count": null, @@ -77,7 +99,7 @@ "outputs": [], "source": [ "tear_color_eig_inds = [7, 2, 4]\n", - "color_of_pts_on_tear = buml_obj.compute_color_of_pts_on_tear(\n", + "color_of_pts_on_tear = model.compute_color_of_pts_on_tear(\n", " y,\n", " tear_color_eig_inds, \n", ")\n", @@ -90,14 +112,6 @@ ")\n", "plt.show()" ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "c9b56299", - "metadata": {}, - "outputs": [], - "source": [] } ], "metadata": { @@ -121,4 +135,4 @@ }, "nbformat": 4, "nbformat_minor": 5 -} +} \ No newline at end of file diff --git a/examples/small_noisyswissroll3_example.ipynb b/examples/small_noisyswissroll3_example.ipynb index f25db13..f21b196 100644 --- a/examples/small_noisyswissroll3_example.ipynb +++ b/examples/small_noisyswissroll3_example.ipynb @@ -58,15 +58,15 @@ "metadata": {}, "outputs": [], "source": [ - "buml_obj = rats.RATS(\n", - " d=2,\n", - " k=28,\n", - " eta_min=5,\n", - " to_tear=True,\n", - " cost_fn_name='alignment'\n", + "model = rats.RATS(\n", + " n_components=2,\n", + " n_neighbors=28,\n", + " min_cluster_size=5,\n", + " tear=True,\n", + " cost_function='alignment'\n", ")\n", "\n", - "y = buml_obj.fit_transform(X=X)\n", + "y = model.fit_transform(X=X)\n", "y" ] }, @@ -78,7 +78,7 @@ "outputs": [], "source": [ "tear_color_eig_inds = [0, 1, 2] # interior has green color so assigned smallest value of i to g-channel\n", - "color_of_pts_on_tear = buml_obj.compute_color_of_pts_on_tear(\n", + "color_of_pts_on_tear = model.compute_color_of_pts_on_tear(\n", " y,\n", " tear_color_eig_inds, \n", ")\n", @@ -122,4 +122,4 @@ }, "nbformat": 4, "nbformat_minor": 5 -} +} \ No newline at end of file diff --git a/examples/small_spherewithhole_example.ipynb b/examples/small_spherewithhole_example.ipynb index 2836f35..2deb028 100644 --- a/examples/small_spherewithhole_example.ipynb +++ b/examples/small_spherewithhole_example.ipynb @@ -58,14 +58,14 @@ "metadata": {}, "outputs": [], "source": [ - "buml_obj = rats.RATS(\n", - " d=2,\n", - " k=28,\n", - " eta_min=5,\n", - " to_tear=True,\n", + "model = rats.RATS(\n", + " n_components=2,\n", + " n_neighbors=28,\n", + " min_cluster_size=5,\n", + " tear=True,\n", ")\n", "\n", - "y = buml_obj.fit_transform(X=X)\n", + "y = model.fit_transform(X=X)\n", "y" ] }, @@ -87,7 +87,7 @@ "figsize = (3,3)\n", "\n", "tear_color_eig_inds = [7, 1, 2]\n", - "color_of_pts_on_tear = buml_obj.compute_color_of_pts_on_tear(\n", + "color_of_pts_on_tear = model.compute_color_of_pts_on_tear(\n", " y,\n", " tear_color_eig_inds, \n", ")\n", @@ -131,4 +131,4 @@ }, "nbformat": 4, "nbformat_minor": 5 -} +} \ No newline at end of file diff --git a/examples/small_swissrollwithhole.ipynb b/examples/small_swissrollwithhole.ipynb index 1cb8856..2b95103 100644 --- a/examples/small_swissrollwithhole.ipynb +++ b/examples/small_swissrollwithhole.ipynb @@ -58,14 +58,14 @@ "metadata": {}, "outputs": [], "source": [ - "buml_obj = rats.RATS(\n", - " d=2,\n", - " k=28,\n", - " eta_min=5,\n", - " to_tear=True,\n", + "model = rats.RATS(\n", + " n_components=2,\n", + " n_neighbors=28,\n", + " min_cluster_size=5,\n", + " tear=True,\n", ")\n", "\n", - "y = buml_obj.fit_transform(X=X)\n", + "y = model.fit_transform(X=X)\n", "y" ] }, @@ -85,7 +85,7 @@ "outputs": [], "source": [ "tear_color_eig_inds = [0, 1, 2] # interior has green color so assigned smallest value of i to g-channel\n", - "color_of_pts_on_tear = buml_obj.compute_color_of_pts_on_tear(\n", + "color_of_pts_on_tear = model.compute_color_of_pts_on_tear(\n", " y,\n", " tear_color_eig_inds, \n", ")\n", @@ -129,4 +129,4 @@ }, "nbformat": 4, "nbformat_minor": 5 -} +} \ No newline at end of file diff --git a/examples/squarerwithtwoholes.ipynb b/examples/squarerwithtwoholes.ipynb index dbd0cdb..f18f1f7 100644 --- a/examples/squarerwithtwoholes.ipynb +++ b/examples/squarerwithtwoholes.ipynb @@ -58,14 +58,14 @@ "metadata": {}, "outputs": [], "source": [ - "buml_obj = rats.RATS(\n", - " d=2,\n", - " k=28,\n", - " eta_min=5,\n", - " to_tear=True,\n", + "model = rats.RATS(\n", + " n_components=2,\n", + " n_neighbors=28,\n", + " min_cluster_size=5,\n", + " tear=True,\n", ")\n", "\n", - "y = buml_obj.fit_transform(X=X)\n", + "y = model.fit_transform(X=X)\n", "y" ] }, @@ -86,7 +86,7 @@ "source": [ "figsize = (3,3)\n", "tear_color_eig_inds = [0, 1, 2] # interior has green color so assigned smallest value of i to g-channel\n", - "color_of_pts_on_tear = buml_obj.compute_color_of_pts_on_tear(\n", + "color_of_pts_on_tear = model.compute_color_of_pts_on_tear(\n", " y,\n", " tear_color_eig_inds, \n", ")\n", @@ -130,4 +130,4 @@ }, "nbformat": 4, "nbformat_minor": 5 -} +} \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index aae6249..1cc94a4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -8,7 +8,7 @@ where = ["src"] [project] name = "pyRATS" version = "0.0.1" -description = "" +description = "Riemannian Alignment of Tangent Spaces (RATS) for dimensionality reduction" readme = "README.md" license-files = ["License"] requires-python = ">=3.9" @@ -19,6 +19,7 @@ dependencies = [ "scipy", "joblib", "multiprocess", + "tqdm", ] [project.optional-dependencies] diff --git a/src/pyRATS/_tear_coloring.py b/src/pyRATS/_tear_coloring.py index 04a0131..3f0e8b7 100644 --- a/src/pyRATS/_tear_coloring.py +++ b/src/pyRATS/_tear_coloring.py @@ -3,7 +3,8 @@ from scipy.sparse.csgraph import connected_components, laplacian import scipy -from joblib import delayed, Parallel +from joblib import delayed, Parallel, parallel_backend +from tqdm import tqdm from pyRATS import _utils @@ -67,14 +68,13 @@ def compute_spectral_color_of_pts_on_tear( i_mat_in_emb=i_mat_in_emb, ) if pts_across_tear is None: - print("No tear detected.") + if verbose: + tqdm.write("No tear detected.") return None n_pts_across_tear = len(pts_across_tear) n_comp, labels = connected_components( tear_graph, directed=False, return_labels=True ) - if verbose: - print("Number of components in the tear graph:", n_comp, flush=True) n_points_in_comp = [] for i in range(n_comp): @@ -91,8 +91,6 @@ def compute_spectral_color_of_pts_on_tear( comp_i = labels == i n_comp_i = np.sum(comp_i) scale = n_comp_i / n_pts_across_tear - if verbose: - print("#points in the tear component no.", i, "are:", n_comp_i, flush=True) # If the component is very small then assign the constant color if n_comp_i <= max(3, int(color_cutoff_frac * n)): @@ -155,7 +153,7 @@ def compute_tear_graph( # If i_mat_in_emb if not provided if i_mat_in_emb is None: if verbose: - print("Metric used for computing incidence matrix in embedding:", metric) + tqdm.write(f"Metric used for computing incidence matrix in embedding: {metric}") i_mat_in_emb = _utils.compute_incidence_matrix_in_embedding( y, partition, k, nu, metric ) @@ -171,14 +169,13 @@ def compute_tear_graph( views_across_tear_graph.eliminate_zeros() # If no partitions/views are across the tear then there is no tear if len(views_across_tear_graph.data) == 0: - print("No views across the tear detected.") + if verbose: + tqdm.write("No views across the tear detected.") return None, None if verbose: - print( - "total #pairs of overlapping partitions/views:", - overlap.count_nonzero(), - flush=True, + tqdm.write( + f"total #pairs of overlapping partitions/views: {overlap.count_nonzero()}" ) pts_across_tear, tear_graph = compute_points_across_tear_graph( views_across_tear_graph, @@ -188,8 +185,8 @@ def compute_tear_graph( n_jobs=n_jobs, ) if verbose: - print("#vertices in tear graph =", tear_graph.shape[0], flush=True) - print("#edges in tear graph =", len(tear_graph.data), flush=True) + tqdm.write(f"#vertices in tear graph = {tear_graph.shape[0]}") + tqdm.write(f"#edges in tear graph = {len(tear_graph.data)}") return pts_across_tear, tear_graph @@ -208,7 +205,7 @@ def compute_points_across_tear_graph( ) n_pairs = len(views_across_tear_graph_row) if verbose: - print("#pairs of partitions/views across tear:", n_pairs, flush=True) + tqdm.write(f"#pairs of partitions/views across tear: {n_pairs}") # Define per-pair processing function def process_pairs(ij_pairs): @@ -261,10 +258,13 @@ def process_pairs(ij_pairs): else: ij_pairs_batches = [(views_across_tear_graph_row, views_across_tear_graph_col)] - # Run in parallel - results = Parallel(n_jobs=n_jobs)( - delayed(process_pairs)(ij_pairs) for ij_pairs in ij_pairs_batches - ) + # Run in parallel; cap inner BLAS threads so each loky worker doesn't + # spawn its own multi-threaded BLAS pool, which would cause n_jobs**2 + # thread oversubscription against ncores hardware threads. + with parallel_backend("loky", inner_max_num_threads=1): + results = Parallel(n_jobs=n_jobs)( + delayed(process_pairs)(ij_pairs) for ij_pairs in ij_pairs_batches + ) # Aggregate results tear_graph_row_inds = np.concatenate([r for r, _, _ in results if len(r)]) @@ -274,7 +274,7 @@ def process_pairs(ij_pairs): for _, _, pts in results: pts_across_tear_mask[pts] = True if verbose: - print("Computing tear graph.", flush=True) + tqdm.write("Computing tear graph.") # Build sparse tear graph tear_graph = csr_matrix( ( diff --git a/src/pyRATS/_utils.py b/src/pyRATS/_utils.py index d86aae3..f4a8bb5 100644 --- a/src/pyRATS/_utils.py +++ b/src/pyRATS/_utils.py @@ -1,26 +1,125 @@ import numpy as np from sklearn.neighbors import NearestNeighbors -from scipy.linalg import svd, svdvals +from scipy.linalg import svd, svdvals, eigh from scipy.sparse.linalg import svds from sklearn.decomposition import KernelPCA -from scipy.sparse import csr_matrix, triu +from scipy.sparse import csr_matrix, triu, block_diag, diags import itertools -from joblib import delayed, Parallel -import multiprocess as mp -from multiprocess import shared_memory - -from scipy.sparse.linalg import svds -from scipy.linalg import svd +from joblib import delayed, Parallel, parallel_backend +import multiprocess as mp_lib from scipy.spatial.distance import pdist, squareform -from scipy.sparse import csr_matrix, block_diag from sklearn.utils.extmath import svd_flip from scipy.sparse.csgraph import ( minimum_spanning_tree, connected_components, breadth_first_order, ) +from tqdm.auto import tqdm + +import os +import sys +import time +import warnings + +_MIN_MEMORY_FLOOR = 64 * 1024 * 1024 # 64 MB — always make some progress +_FALLBACK_MEMORY = 4 * 1024 * 1024 * 1024 # 4 GB if psutil is missing + + +def _cgroup_available_bytes(): + """Linux-only: bytes still available inside the current cgroup, or None. + + Reads cgroup v2 first, then falls back to v1. Returns None on any other + OS, when no limit is set, or when the files can't be parsed. + """ + if not sys.platform.startswith("linux"): + return None + pairs = ( + ("/sys/fs/cgroup/memory.max", "/sys/fs/cgroup/memory.current"), + ("/sys/fs/cgroup/memory/memory.limit_in_bytes", + "/sys/fs/cgroup/memory/memory.usage_in_bytes"), + ) + for max_path, cur_path in pairs: + try: + with open(max_path) as f: + raw = f.read().strip() + if not raw or raw == "max": + return None + limit = int(raw) + # cgroup v1 uses a huge sentinel when unconstrained. + if limit >= (1 << 62): + return None + with open(cur_path) as f: + used = int(f.read().strip()) + return max(0, limit - used) + except (OSError, ValueError): + continue + return None + + +_MEMORY_CACHE_TTL = 0.25 # seconds — fresh enough to track fit-phase allocations +_memory_cache: tuple[float, int] | None = None # (timestamp, bytes) + + +def _available_memory_bytes(): + """Live probe of usable memory in bytes, cached with a short TTL. + + Sources, all combined via min(): + * psutil.virtual_memory().available scaled by 0.75 (or 4GB fallback) + * cgroup v2/v1 remaining quota on Linux containers (avoids OOM-kill + before psutil sees pressure) + * PYRATS_MEMORY_LIMIT env var as a hard cap + + Result is cached for _MEMORY_CACHE_TTL seconds (default 250ms). This + eliminates psutil and cgroup file-read overhead on hot call sites (e.g. + iter_eval_ inside best()'s per-point loop) while still reacting to + large allocations between fit phases — the original "cache forever" bug. + Floored at 64MB so chunking can always make at least one row of progress. + """ + global _memory_cache + now = time.monotonic() + if _memory_cache is not None and now - _memory_cache[0] < _MEMORY_CACHE_TTL: + return _memory_cache[1] + + try: + import psutil + avail = int(psutil.virtual_memory().available * 0.75) + except (ImportError, AttributeError): + avail = _FALLBACK_MEMORY + + cgroup = _cgroup_available_bytes() + if cgroup is not None: + avail = min(avail, int(cgroup * 0.75)) + + user_cap = os.environ.get("PYRATS_MEMORY_LIMIT") + if user_cap: + try: + avail = min(avail, int(user_cap)) + except ValueError: + pass + + result = max(avail, _MIN_MEMORY_FLOOR) + _memory_cache = (now, result) + return result + + +# Backward-compatible alias for any external caller pinned to the old name. +def _get_available_memory(n_jobs=1): + return _available_memory_bytes() // max(1, n_jobs) + + +def _inner_blas_threads(n_jobs): + """BLAS threads per outer Parallel worker, sized to avoid oversubscription. + + Apple Accelerate / OpenBLAS otherwise spawn a pool sized to all cores + inside each outer worker, giving n_jobs * cpu_count software threads on + cpu_count hardware threads — the n_jobs ~ cpu_count/2 cliff. Pinning to 1 + fixes that but leaves cores idle when n_jobs < cpu_count. cpu_count // + n_jobs strikes the balance: full utilization, no contention. Floored at 1. + """ + cores = os.cpu_count() or 1 + return max(1, cores // max(1, n_jobs)) def lpca(X, d, U, n_jobs, verbose=False): @@ -102,10 +201,16 @@ def target_proc(p_num, chunk_sz): return start_ind, end_ind, Psi, mu, var_explained, n_pc_dir_chosen chunk_sz = int(n / n_jobs) - results = Parallel(n_jobs=n_jobs)( - delayed(target_proc)(i, chunk_sz) for i in range(n_jobs) - ) - results = [target_proc(0, n)] + if n_jobs == 1 or n < 1000: # Threshold for Parallel overhead + results = [target_proc(0, n)] + else: + with parallel_backend("loky", inner_max_num_threads=_inner_blas_threads(n_jobs)): + results = Parallel(n_jobs=n_jobs)( + delayed(target_proc)(i, chunk_sz) + for i in tqdm( + range(n_jobs), desc="PCA", unit="chunk", leave=False, disable=not verbose + ) + ) for i in range(len(results)): start_ind, end_ind, Psi_, mu_, var_explained_, n_pc_dir_chosen_ = results[i] @@ -114,9 +219,6 @@ def target_proc(p_num, chunk_sz): param.var_explained[start_ind:end_ind, :] = var_explained_ param.n_pc_dir_chosen[start_ind:end_ind] = n_pc_dir_chosen_ - if verbose: - print("local_param: all %d points processed..." % n) - return param @@ -165,7 +267,7 @@ def kpca(X, d, U, kernel, fit_inverse_transform, n_jobs, verbose=False): local_param.model = np.empty(n, dtype=object) local_param.zeta = np.zeros(n) - def target_proc(p_num, chunk_sz, q_): + def target_proc(p_num, chunk_sz): start_ind = p_num * chunk_sz if p_num == (n_jobs - 1): end_ind = n @@ -184,28 +286,20 @@ def target_proc(p_num, chunk_sz, q_): U_k = U[k] X_k = X[U_k, :] model_[k - start_ind].fit(X_k) - q_.put((start_ind, end_ind, model_)) + return start_ind, end_ind, model_ - q_ = mp.Queue() chunk_sz = int(n / n_jobs) - proc = [] - for p_num in range(n_jobs): - proc.append( - mp.Process(target=target_proc, args=(p_num, chunk_sz, q_), daemon=True) + with parallel_backend("loky", inner_max_num_threads=_inner_blas_threads(n_jobs)): + results = Parallel(n_jobs=n_jobs)( + delayed(target_proc)(p_num, chunk_sz) + for p_num in tqdm( + range(n_jobs), desc="KPCA", unit="chunk", leave=False, disable=not verbose + ) ) - proc[-1].start() - for p_num in range(n_jobs): - start_ind, end_ind, model_ = q_.get() + for start_ind, end_ind, model_ in results: local_param.model[start_ind:end_ind] = model_ - q_.close() - - for p_num in range(n_jobs): - proc[p_num].join() - - if verbose: - print("local_param: all %d points processed..." % n) return local_param @@ -260,16 +354,10 @@ def batched_pdist(x): x = np.array(x) n, k, _ = x.shape - res = np.empty((n, k * (k - 1) // 2)) - p = 0 - for i in range(k): - a = x[:, i] - bs = x[:, i + 1 :] - - cs = bs - a[:, None, :] - res[:, p : p + k - i - 1] = np.sum(cs * cs, axis=-1) - p += k - i - 1 - return np.sqrt(res) + # Precompute all upper-triangle pair indices (same order as the original loop) + pair_i, pair_j = np.triu_indices(k, k=1) # each shape (num_pairs,) + diff = x[:, pair_i, :] - x[:, pair_j, :] # (n, num_pairs, d) + return np.sqrt(np.sum(diff * diff, axis=-1)) # (n, num_pairs) def batched_procrustes_cost(X, Y): @@ -383,7 +471,7 @@ def nearest_neighbors(X, k, metric, sort_results=True, n_jobs=-1): def cost_of_moving_distortion( - k, d_e, neigh_ind_k, U_k, local_param, c, n_C, Utilde, eta_min, eta_max + k, d_e, neigh_ind_k, U_k, local_param, c, n_C, Utilde, eta_min, eta_max, n_jobs=1 ): """Computes the minimum cost and destination cluster possible when merging k with its neighboring clusters. @@ -447,7 +535,12 @@ def cost_of_moving_distortion( c_U_k_uniq = np.unique(c_U_k).tolist() cost_x_k_to = np.zeros(len(c_U_k_uniq)) + np.inf - U_k_list = list(U_k) + # Reconstruct the set from neigh_ind_k inside this process: pickling and + # unpickling a set rebuilds the hash table with a different bucket layout, + # so list(U_k) iterates in a different order in the parent vs. a loky + # worker. Building the set from the same numpy array in each process gives + # the same iteration order everywhere. + U_k_list = list(set(neigh_ind_k)) # Iterate over all m in c_{U_k} i = 0 @@ -475,7 +568,7 @@ def cost_of_moving_distortion( # that is cost_{x_k \rightarrow m} cost_x_k_to[i] = compute_zeta( d_e[np.ix_(U_k_U_Utilde_m, U_k_U_Utilde_m)], - local_param.eval_({"view_index": m, "data_mask": U_k_U_Utilde_m}), + local_param.eval_(m, U_k_U_Utilde_m), ) i += 1 @@ -493,18 +586,36 @@ def cost_of_moving_distortion( def compute_zeta(d_e_mask0, Psi_k_mask): - d_e_mask = d_e_mask0.toarray() - if d_e_mask.shape[0] == 1: + """Computes the local distortion zeta for a given parameterization and neighborhood distances. + + Parameters + ---------- + d_e_mask0 : sparse matrix or array-like + Pairwise distances between neighbors in the original space. + Psi_k_mask : array-like + Embedded coordinates of the neighbors. + """ + # For small k (typical in clustering), dense operations are MUCH faster than + # sparse triu/nonzero lookups. + d_e_mask = d_e_mask0.toarray() if hasattr(d_e_mask0, "toarray") else d_e_mask0 + if d_e_mask.shape[0] <= 1: return 1 + + # Use squareform/pdist for efficient pair extraction on small dense matrices d_e_mask_ = squareform(d_e_mask) mask = d_e_mask_ != 0 - d_e_mask_ = d_e_mask_[mask] - disc_lip_const = pdist(Psi_k_mask)[mask] / d_e_mask_ + if not np.any(mask): + return 1 + + d_e_vals = d_e_mask_[mask] + dist_embedded = pdist(Psi_k_mask)[mask] + + disc_lip_const = dist_embedded / d_e_vals return np.max(disc_lip_const) / (np.min(disc_lip_const) + 1e-12) def cost_of_moving_alignment_error( - k, d_e, neigh_ind_k, U_k, param, c, n_C, Utilde, eta_min, eta_max + k, d_e, neigh_ind_k, U_k, param, c, n_C, Utilde, eta_min, eta_max, n_jobs=1 ): """Computes the minimum cost and destination cluster possible when merging k with its neighboring clusters. @@ -568,7 +679,12 @@ def cost_of_moving_alignment_error( c_U_k_uniq = np.unique(c_U_k) cost_x_k_to = np.zeros(len(c_U_k_uniq)) + np.inf - U_k_list = list(U_k) + # Reconstruct the set from neigh_ind_k inside this process: pickling and + # unpickling a set rebuilds the hash table with a different bucket layout, + # so list(U_k) iterates in a different order in the parent vs. a loky + # worker. Building the set from the same numpy array in each process gives + # the same iteration order everywhere. + U_k_list = list(set(neigh_ind_k)) neighbor_mask = ( (n_C[c_U_k_uniq] < eta_max) & (n_C[c_U_k_uniq] >= n_C_c_k) & (c_U_k_uniq != c_k) @@ -577,10 +693,9 @@ def cost_of_moving_alignment_error( # c_U_k_uniq_k = np.union1d(c_U_k_uniq[neighbor_mask], c_k) c_U_k_uniq_k = np.union1d(c_U_k_uniq[neighbor_mask], k) evals = param.batched_eval_( - { - "view_index": c_U_k_uniq_k, - "data_mask": np.broadcast_to(U_k_list, [len(c_U_k_uniq_k), len(U_k_list)]), - } + c_U_k_uniq_k, + np.broadcast_to(U_k_list, [len(c_U_k_uniq_k), len(U_k_list)]), + n_jobs=n_jobs ) m = c_U_k_uniq_k == k @@ -672,90 +787,90 @@ def best(d_e, U, param, eta_min, eta_max, cost_fn, verbose, n_jobs): cost = np.zeros(n) + np.inf dest = np.zeros(n, dtype="int") - 1 - - shm_cost = shared_memory.SharedMemory(create=True, size=cost.nbytes) - np_cost = np.ndarray(cost.shape, dtype=cost.dtype, buffer=shm_cost.buf) - np_cost[:] = cost[:] - shm_dest = shared_memory.SharedMemory(create=True, size=dest.nbytes) - np_dest = np.ndarray(dest.shape, dtype=dest.dtype, buffer=shm_dest.buf) - np_dest[:] = dest[:] - - shm_cost_name = shm_cost.name - cost_shape = cost.shape - cost_dtype = cost.dtype - shm_dest_name = shm_dest.name - dest_shape = dest.shape - dest_dtype = dest.dtype + U_csc = U.tocsc() # Vary eta from 2 to eta_{min} - if verbose: - print("Constructing intermediate views.") - for eta in range(2, eta_min + 1): - if verbose: - print("eta = %d." % eta) - print( - "#non-empty views with sz < %d = %d" - % (eta, np.sum((n_C > 0) * (n_C < eta))) - ) - print("#nodes in views with sz < %d = %d" % (eta, np.sum(n_C[c] < eta))) - - def target_proc(p_num, chunk_sz, n_, Utilde, n_C, c, S): - existing_shm_cost = shared_memory.SharedMemory(name=shm_cost_name) - cost_ = np.ndarray( - cost_shape, dtype=cost_dtype, buffer=existing_shm_cost.buf - ) - existing_shm_dest = shared_memory.SharedMemory(name=shm_dest_name) - dest_ = np.ndarray( - dest_shape, dtype=dest_dtype, buffer=existing_shm_dest.buf - ) - + for eta in tqdm( + range(2, eta_min + 1), desc="Intermediate views", disable=not verbose, position=0, leave=True + ): + # tqdm.write( + # "#non-empty views with sz < %d = %d" + # % (eta, np.sum((n_C > 0) * (n_C < eta))) + # ) + # tqdm.write("#nodes in views with sz < %d = %d" % (eta, np.sum(n_C[c] < eta))) + + def target_proc(p_num, chunk_sz, n_, Utilde, n_C, c): start_ind = p_num * chunk_sz if p_num == (n_jobs - 1): end_ind = n_ else: end_ind = (p_num + 1) * chunk_sz - for k in range(start_ind, end_ind): - if S is None: - k1 = k - else: - k1 = S[k] - cost_[k1], dest_[k1] = cost_of_moving( - k1, d_e, neigh_ind[k1], U_[k1], param, c, n_C, Utilde, eta, eta_max + cost_ = np.zeros(end_ind - start_ind) + np.inf + dest_ = np.zeros(end_ind - start_ind, dtype="int") - 1 + for i, k in enumerate(range(start_ind, end_ind)): + cost_[i], dest_[i] = cost_of_moving( + k, d_e, neigh_ind[k], U_[k], param, c, n_C, Utilde, eta, eta_max, n_jobs=n_jobs ) + return start_ind, end_ind, cost_, dest_ - proc = [] chunk_sz = int(n / n_jobs) - for p_num in range(n_jobs): - proc.append( - mp.Process( - target=target_proc, - args=(p_num, chunk_sz, n, Utilde, n_C, c, None), - daemon=True, - ) - ) - proc[-1].start() - - for p_num in range(n_jobs): - proc[p_num].join() + if n_jobs == 1 or n < 1000: + results = [target_proc(0, n, n, Utilde, n_C, c)] + else: + # Use multiprocess.Process (fork) rather than joblib/loky (spawn). + # cost_of_moving reads Utilde[m], a Python set whose iteration + # order depends on its hash-table bucket layout. Spawn re-pickles + # the set and rebuilds the table with a different layout in each + # worker, so list(Utilde[m]) iterates in a different order than + # in the parent — permuting the rows/cols passed to + # local_param.eval_ and d_e, producing float-LSB cost diffs that + # flip argmin tie-breaks and yield divergent cluster + # assignments. Fork shares the parent's address space via + # copy-on-write, so the children see the exact same set layout + # and produce bit-identical costs to a serial run. + q = mp_lib.Queue() + def _worker(p_num): + q.put(target_proc(p_num, chunk_sz, n, Utilde, n_C, c)) + procs = [ + mp_lib.Process(target=_worker, args=(p_num,), daemon=True) + for p_num in range(n_jobs) + ] + for p in procs: + p.start() + results = [q.get() for _ in range(n_jobs)] + for p in procs: + p.join() + for start_ind, end_ind, cost_, dest_ in results: + cost[start_ind:end_ind] = cost_ + dest[start_ind:end_ind] = dest_ # Compute point with minimum cost # Compute k and cost^* - k = np.argmin(np_cost) - cost_star = np_cost[k] + k = np.argmin(cost) + cost_star = cost[k] - if verbose: - print("Costs computed when eta = %d." % eta) + # if verbose: + # tqdm.write("Costs computed when eta = %d." % eta) # Loop until minimum cost is inf + pbar_merge = None + if verbose: + pbar_merge = tqdm( + total=np.sum(n_C[c] < eta), + desc="Merging", + unit="pts", + leave=False, + position=1, + ) + total_len_S = 0 ctr = 0 while cost_star < np.inf: - # print(k, cost_star, flush=True) # Move x_k from cluster s to # dest_k and update variables s = c[k] - dest_k = np_dest[k] + dest_k = dest[k] c[k] = dest_k n_C[s] -= 1 n_C[dest_k] += 1 @@ -769,39 +884,39 @@ def target_proc(p_num, chunk_sz, n_, Utilde, n_C, c, S): if n_C[s] > 0: S_ = ( (c == dest_k) - | (np_dest == dest_k) - | np.array(U[:, list(Clstr[s])].sum(1), dtype=bool).flatten() + | (dest == dest_k) + | np.array(U_csc[:, list(Clstr[s])].sum(1), dtype=bool).flatten() ) else: - S_ = (c == dest_k) | (np_dest == dest_k) | (np_dest == s) + S_ = (c == dest_k) | (dest == dest_k) | (dest == s) S = np.where(S_)[0].tolist() len_S = len(S) total_len_S += len_S ctr += 1 for k in S: - np_cost[k], np_dest[k] = cost_of_moving( - k, d_e, neigh_ind[k], U_[k], param, c, n_C, Utilde, eta, eta_max + cost[k], dest[k] = cost_of_moving( + k, d_e, neigh_ind[k], U_[k], param, c, n_C, Utilde, eta, eta_max, n_jobs=1 ) - k = np.argmin(np_cost) - cost_star = np_cost[k] + if verbose: + pbar_merge.update(1) + + k = np.argmin(cost) + cost_star = cost[k] if verbose: - print( - "ctr=%d, total_len_S=%d, avg_len_S=%0.3f" - % (ctr, total_len_S, total_len_S / (ctr + 1e-12)) - ) - print( - "Remaining #nodes in views with sz < %d = %d" - % (eta, np.sum(n_C[c] < eta)) - ) - print("Done with eta = %d." % eta) + pbar_merge.close() + # tqdm.write( + # "ctr=%d, total_len_S=%d, avg_len_S=%0.3f" + # % (ctr, total_len_S, total_len_S / (ctr + 1e-12)) + # ) + # tqdm.write( + # "Remaining #nodes in views with sz < %d = %d" + # % (eta, np.sum(n_C[c] < eta)) + # ) + # tqdm.write("Done with eta = %d." % eta) - shm_cost.close() - shm_cost.unlink() - shm_dest.close() - shm_dest.unlink() return c, n_C @@ -870,8 +985,8 @@ def target_proc(p_num): m = W_rows[i] mpp = W_cols[i] mask = i_mat[m, :].multiply(i_mat[mpp, :]).nonzero()[1] - V_mmp = param.eval_({"view_index": m, "data_mask": mask}) - V_mpm = param.eval_({"view_index": mpp, "data_mask": mask}) + V_mmp = param.eval_(m, mask) + V_mpm = param.eval_(mpp, mask) Vbar_mmp = V_mmp - np.mean(V_mmp, 0)[np.newaxis, :] Vbar_mpm = V_mpm - np.mean(V_mpm, 0)[np.newaxis, :] # Compute ambiguity of the overlaps captured by singular values @@ -880,27 +995,29 @@ def target_proc(p_num): overlap_svals_[i - start_ind, :] = svdvals_ return (start_ind, end_ind, overlap_svals_) - res = Parallel(n_jobs=n_jobs, return_as="generator_unordered")( - delayed(target_proc)(p_num) for p_num in range(n_jobs) - ) + if n_jobs == 1 or n_elem < 1000: + res = [target_proc(p_num) for p_num in range(n_jobs)] + else: + with parallel_backend("loky", inner_max_num_threads=_inner_blas_threads(n_jobs)): + res = list(Parallel(n_jobs=n_jobs, + return_as="generator_unordered")( + delayed(target_proc)(p_num) for p_num in range(n_jobs) + )) for value in res: start_ind, end_ind, overlap_svals_ = value overlap_svals[start_ind:end_ind, :] = overlap_svals_ + # In most cases, when almost all overlaps have rank d, + # W_data = overlap_svals[:,-1], the d-th singular value of the overlap + # is sufficient to determine the priority of the overlaps. # overlap_svals[overlap_svals= M: + # B_tilde is (n, M). C becomes a tiny (M, M) dense matrix. + # Sparse matrix multiplication here is incredibly fast. + C = (B_tilde.T @ B_tilde).todense() + S2, V = eigh(C) + + # eigh returns ascending order; reverse to match standard SVD output + idx = np.argsort(S2)[::-1] + S2 = S2[idx] + V = V[:, idx] + + # Recover singular values and right singular vectors + SS = np.sqrt(np.maximum(S2, 0)) + VT = V.T + + # Recover left singular vectors: U = B_tilde @ V @ diag(1/SS) + with np.errstate(divide='ignore', invalid='ignore'): + SS_inv = np.where(SS > 1e-10, 1.0 / SS, 0.0) + + U12 = (B_tilde @ V) * SS_inv[np.newaxis, :] + else: + # Fallback if M > n: Compute (n, n) Gram matrix instead + C = (B_tilde @ B_tilde.T).todense() + S2, U12 = eigh(C) + + idx = np.argsort(S2)[::-1] + S2 = S2[idx] + U12 = U12[:, idx] + + SS = np.sqrt(np.maximum(S2, 0)) + + with np.errstate(divide='ignore', invalid='ignore'): + SS_inv = np.where(SS > 1e-10, 1.0 / SS, 0.0) + + VT = (U12.T @ B_tilde) * SS_inv[:, np.newaxis] + # ------------------------------------------- U12, VT = svd_flip(U12, VT) V = VT.T @@ -1285,40 +1444,59 @@ def compute_Lpinv_helpers(W): U2 = U12[:, m_1:] V1 = V[:, :m_1] V2 = V[:, m_1:] - return [D_1_inv_sqrt, D_2_inv_sqrt, U1, U2, V1, V2, Sigma_1, Sigma_2] + return [D_1_inv_sqrt[:,None], D_2_inv_sqrt[None,:], U1, U2, V1, V2, Sigma_1, Sigma_2] # Ngoc-Diep Ho, Paul Van Dooren, On the pseudo-inverse of the Laplacian of a bipartite graph def compute_Lpinv_MT(Lpinv_helpers, B): D_1_inv_sqrt, D_2_inv_sqrt, U1, U2, V1, V2, Sigma_1, Sigma_2 = Lpinv_helpers n = D_1_inv_sqrt.shape[0] - B_mean = B.mean(axis=1) + Md = B.shape[0] + M = B.shape[1] - n + + B_mean = np.array(B.mean(axis=1)) if len(B_mean.shape) == 1: B_mean = B_mean[:, None] - B_n = B - B_mean - B_n = np.asarray(B_n) - B1T = D_1_inv_sqrt * (B_n[:, :n].T) - B2T = D_2_inv_sqrt.T * (B_n[:, n:].T) - U1TB1T = np.matmul(U1.T, B1T) - U2TB1T = np.matmul(U2.T, B1T) - V1TB2T = np.matmul(V1.T, B2T) - V2TB2T = np.matmul(V2.T, B2T) + B1 = B[:, :n] + B2 = B[:, n:] + + # Optimized matrix-vector products using identities to avoid full dense B_n + # Identity: U^T (diag(D) (B - mu 1^T))^T = (U^T diag(D)) B^T - (U^T diag(D) 1) mu^T + + # Compute U^T * B1T terms + U1T_D1 = (U1 * D_1_inv_sqrt).T # (m1, n) + U1TB1T = (U1T_D1 @ B1.T) - (U1T_D1.sum(axis=1)[:, None] @ B_mean.T) + + U2T_D1 = (U2 * D_1_inv_sqrt).T # (M-m1, n) + U2TB1T = (U2T_D1 @ B1.T) - (U2T_D1.sum(axis=1)[:, None] @ B_mean.T) + + # Compute V^T * B2T terms + # D_2_inv_sqrt is (1, M) + V1T_D2 = (V1 * D_2_inv_sqrt.T).T # (m1, M) + V1TB2T = (V1T_D2 @ B2.T) - (V1T_D2.sum(axis=1)[:, None] @ B_mean.T) + + V2T_D2 = (V2 * D_2_inv_sqrt.T).T # (M-m1, M) + V2TB2T = (V2T_D2 @ B2.T) - (V2T_D2.sum(axis=1)[:, None] @ B_mean.T) + + # B1T and B2T are needed for the final sum, but we only materialize them once + # B1T = D_1_inv_sqrt * (B - B_mean)[:, :n].T + B1T = (B1.T.multiply(D_1_inv_sqrt)).toarray() - (D_1_inv_sqrt @ B_mean.T) temp1 = ( - -0.75 * np.matmul(U1, U1TB1T) - - 0.25 * np.matmul(U1, V1TB2T) - + np.matmul(U2, ((Sigma_1 - 1)) * (U2TB1T)) - + np.matmul(U2, Sigma_2 * (V2TB2T)) + -0.75 * (U1 @ U1TB1T) + - 0.25 * (U1 @ V1TB2T) + + (U2 @ ((Sigma_1 - 1) * U2TB1T)) + + (U2 @ (Sigma_2 * V2TB2T)) + B1T ) temp1 = temp1 * D_1_inv_sqrt temp2 = ( - -0.25 * np.matmul(V1, U1TB1T) - + 0.25 * np.matmul(V1, V1TB2T) - + np.matmul(V2, Sigma_2 * (U2TB1T)) - + np.matmul(V2, Sigma_1 * (V2TB2T)) + -0.25 * (V1 @ U1TB1T) + + 0.25 * (V1 @ V1TB2T) + + (V2 @ (Sigma_2 * U2TB1T)) + + (V2 @ (Sigma_1 * V2TB2T)) ) temp2 = temp2 * D_2_inv_sqrt.T @@ -1335,48 +1513,72 @@ def compute_CC(D, B, Lpinv_BT): def build_ortho_optim(d, Utilde, param, verbose): """Compute the Graph-Laplacian's inverse times B^\top.""" M, n = Utilde.shape - B_row_inds = [] - B_col_inds = [] - B_vals = [] - D = [] - W = Utilde.astype(float) - W_vals = W.data - + W_vals_all = W.data + + # Vectorized construction of D and B components + D_list = [] + + # We still loop over M to build B values, but we use pre-allocated arrays + # or better, compute B values in blocks. + B_data_vals = [] + B_cluster_vals = [] + B_cols = [] + for i in range(M): Utilde_i = Utilde[i, :].indices - X_ = param.eval_({"view_index": i, "data_mask": Utilde_i}) - sqrt_p_ki = np.sqrt(np.array(W[i, :].data).flatten()[:, None]) - X_ = sqrt_p_ki * X_ - D.append(np.matmul(X_.T, X_)) - - row_inds = list(range(d * i, d * (i + 1))) - col_inds = Utilde_i.tolist() - - B_row_inds += row_inds + np.repeat(row_inds, len(col_inds)).tolist() - B_col_inds += np.repeat([n + i], d).tolist() + np.tile(col_inds, d).tolist() - - X_ = sqrt_p_ki * X_ - B_vals += np.sum(-X_.T, axis=1).tolist() + X_.T.flatten().tolist() - - D = block_diag(D, format="csr") - B = csr_matrix((B_vals, (B_row_inds, B_col_inds)), shape=(M * d, n + M)) - - if verbose: - print("min and max weights:", np.array(W_vals).min(), np.array(W_vals).max()) + X_ = param.eval_(i, Utilde_i) + weights_i = W_vals_all[W.indptr[i]:W.indptr[i+1]] + sqrt_p_ki = np.sqrt(weights_i[:, None]) + + # Weighted embeddings for D: sqrt(W) * X + X_weighted = sqrt_p_ki * X_ + D_list.append(X_weighted.T @ X_weighted) + + # Weighted embeddings for B: W * X + X_B = weights_i[:, None] * X_ + B_data_vals.append(X_B.T.flatten()) + B_cluster_vals.append(np.sum(-X_B.T, axis=1)) + B_cols.append(Utilde_i) + + D = block_diag(D_list, format="csr") + + # Efficiently construct B + B_data_vals = np.concatenate(B_data_vals) + B_cluster_vals = np.concatenate(B_cluster_vals) + + B_cols_data = [] + for i in range(M): + B_cols_data.append(np.tile(B_cols[i], d)) + B_cols_data = np.concatenate(B_cols_data) + + # Row indices for data values + counts = np.diff(W.indptr) + B_rows_data = np.repeat(np.arange(M) * d, counts * d) + np.tile(np.arange(d), len(B_cols_data) // d) + # Wait, the above tiling is not quite right if counts vary. + # Correct rows: repeat each row index (i*d + offset) + B_rows_data = [] + for i in range(M): + r = np.arange(i * d, (i + 1) * d) + B_rows_data.append(np.repeat(r, counts[i])) + B_rows_data = np.concatenate(B_rows_data) + + # B indices for cluster nodes + B_rows_cluster = np.arange(M * d) + B_cols_cluster = np.repeat(np.arange(n, n + M), d) + + # Combine everything + B_row_all = np.concatenate([B_rows_cluster, B_rows_data]) + B_col_all = np.concatenate([B_cols_cluster, B_cols_data]) + B_val_all = np.concatenate([B_cluster_vals, B_data_vals]) + + B = csr_matrix((B_val_all, (B_row_all, B_col_all)), shape=(M * d, n + M)) - print( - f"Computing Pseudoinverse of a matrix of L of size {n} + {M} multiplied with B", - flush=True, - ) Lpinv_helpers = compute_Lpinv_helpers(W) - Lpinv_BT = compute_Lpinv_MT(Lpinv_helpers, B) CC = compute_CC(D, B, Lpinv_BT) - CC_net = CC - - return CC_net, Lpinv_BT, D, B + return CC, Lpinv_BT, D, B # unscaled alignment error @@ -1451,14 +1653,14 @@ def procrustes_init(seq, rho, y, is_visited_view, d, Utilde, C, param): Utilde_s_mp = Utilde_s.multiply(Utilde[mp, :]).nonzero()[1] n_Utilde_s_Z_s[Utilde_s_mp] += 1 mu_s[Utilde_s_mp, :] += param.eval_( - {"view_index": mp, "data_mask": Utilde_s_mp} + mp, Utilde_s_mp ) # Compute T_s and v_s by aligning the embedding of the overlap # between sth view and the views in Z_s, with the centroid mu_s temp = n_Utilde_s_Z_s > 0 mu_s = mu_s[temp, :] / n_Utilde_s_Z_s[temp, np.newaxis] - V_s_Z_s = param.eval_({"view_index": s, "data_mask": temp}) + V_s_Z_s = param.eval_(s, temp) T_s, v_s = procrustes(V_s_Z_s, mu_s) @@ -1471,72 +1673,11 @@ def procrustes_init(seq, rho, y, is_visited_view, d, Utilde, C, param): # Compute global embedding of point in sth cluster C_s = C[s, :].indices - y[C_s, :] = param.eval_({"view_index": s, "data_mask": C_s}) + y[C_s, :] = param.eval_(s, C_s) return y, is_visited_view -def procrustes_final( - y, d, Utilde, C, intermed_param, seq_of_intermed_views_in_cluster, global_opts -): - M, n = Utilde.shape - # Traverse over intermediate views in a random order - seq = np.random.permutation(M) - is_first_view_in_cluster = np.zeros(M, dtype=bool) - # is_first_view_in_cluster[i] = True if the ith view is the first - # view in some cluster of views - for i in range(len(seq_of_intermed_views_in_cluster)): - is_first_view_in_cluster[seq_of_intermed_views_in_cluster[i][0]] = True - - y_due_to_all_views = [] - for k in range(n): - y_due_to_all_views.append({}) - - for s in range(M): - Utilde_s = Utilde[s, :].nonzero()[1] - y_Utilde_s = intermed_param.eval_({"view_index": s, "data_mask": Utilde_s}) - for k in range(len(Utilde_s)): - y_due_to_all_views[Utilde_s[k]][s] = y_Utilde_s[k, :] - - # For a given seq, refine the global embedding - for _ in range(global_opts["max_internal_iter"]): - for s in seq.tolist(): - # # Never refine s_0th intermediate view - # if is_first_view_in_cluster[s]: - # continue - - Utilde_s = Utilde[s, :].nonzero()[1] - mu_s = [] - Utilde_s_ = [] - for k_ in range(len(Utilde_s)): - k = Utilde_s[k_] - if len(y_due_to_all_views[k]) == 1: - continue - Utilde_s_.append(k) - mu = np.array(list(y_due_to_all_views[k].values())).sum(axis=0) - mu = (mu - y_due_to_all_views[k][s]) / (len(y_due_to_all_views[k]) - 1) - mu_s.append(mu) - mu_s = np.array(mu_s) - Utilde_s_ = np.array(Utilde_s_) - - # Compute T_s and v_s by aligning the embedding of the overlap - # between sth view and the views in Z_s, with the centroid mu_s - V_s_Z_s = intermed_param.eval_({"view_index": s, "data_mask": Utilde_s_}) - T_s, v_s = procrustes(V_s_Z_s, mu_s) - - # Update T_s, v_s - intermed_param.T[s, :, :] = np.matmul(intermed_param.T[s, :, :], T_s) - intermed_param.v[s, :] = ( - np.matmul(intermed_param.v[s, :][np.newaxis, :], T_s) + v_s - ) - y_Utilde_s = intermed_param.eval_({"view_index": s, "data_mask": Utilde_s}) - for k in range(len(Utilde_s)): - y_due_to_all_views[Utilde_s[k]][s] = y_Utilde_s[k, :] - - y = np.zeros((n, d)) - for k in range(n): - y[k, :] = np.array(list(y_due_to_all_views[k].values())).mean(axis=0) - return y class Param: @@ -1571,54 +1712,108 @@ def __init__(self, algo="lpca", **kwargs): # For KPCA etc self.model = None - self.X = None - self.y = None self.add_dim = False self.standardize = False - def batched_eval_(self, opts): - """Maps multiple points to the new space through their local (K-)PCA prameterizations. + def iter_eval_(self, view_index, data_mask, peak_bytes_per_row=0): + """Stream the local-(K-)PCA embedding in memory-bounded chunks. + + Yields ``(slice, chunk)`` pairs where ``chunk`` is the embedded + sub-array for ``view_index[slice]`` / ``data_mask[slice]``. Lets + callers reduce per chunk (e.g. compute pdists, take a max) without + ever materializing the full ``(n_eval, k, d)`` output. Parameters ------ - opts : dict - Must have keys - view_index : array-like, shape (n_points) - The indices of the parameterization to use. - - data_mask : array-like, shape (n_points, n_neighbors) - List of points to map to the embedding dimension for each point in 'view_index' + view_index : array-like, shape (n_points,) + data_mask : array-like, shape (n_points, n_neighbors) + peak_bytes_per_row : int, default=0 + Caller-provided estimate of additional bytes the downstream + pipeline will allocate per row (e.g. batched_pdist's diff + buffer). Folded into the chunk-size budget so the *whole* + pipeline stays within available memory, not just this method. + + On MemoryError the chunk size is halved and the slice is retried; + a one-time RuntimeWarning is emitted so the user knows they're at + the limit. """ - - ks = opts["view_index"] - masks = opts["data_mask"] + ks = np.asarray(view_index) if not isinstance(view_index, np.ndarray) else view_index + masks = data_mask + n_eval = len(ks) + if n_eval == 0: + return if self.algo == "lpca": - temp = np.matmul( - self.X[masks] - self.mu[ks][:, np.newaxis, :], self.Psi[ks] - ) - n = self.X.shape[0] + n_neighbors = masks.shape[1] + n_features = self.X.shape[1] + d = self.Psi.shape[2] + itemsize = self.X.dtype.itemsize + # Per-row peak inside this method: input gather + output buffer. + own_per_row = (n_neighbors * n_features + n_neighbors * d) * itemsize + per_row = max(1, own_per_row + max(0, int(peak_bytes_per_row))) + + chunk_size = max(1, _available_memory_bytes() // per_row) + chunk_size = min(chunk_size, n_eval) + + warned = False + start = 0 + while start < n_eval: + end = min(start + chunk_size, n_eval) + sl = slice(start, end) + ks_chunk = ks[sl] + masks_chunk = masks[sl] + try: + chunk = np.matmul( + self.X[masks_chunk] - self.mu[ks_chunk][:, np.newaxis, :], + self.Psi[ks_chunk], + ) + except MemoryError: + if chunk_size <= 1: + raise + chunk_size = max(1, chunk_size // 2) + if not warned: + warnings.warn( + "pyRATS: hit MemoryError during local embedding; " + f"halving chunk size to {chunk_size} and retrying. " + "Consider lowering n_neighbors or setting " + "PYRATS_MEMORY_LIMIT.", + RuntimeWarning, + stacklevel=2, + ) + warned = True + continue + chunk = self._apply_post_(ks_chunk, masks_chunk, chunk) + yield sl, chunk + start = end else: - temp = [] + # KPCA path is already row-by-row; yield singletons so callers + # have a uniform streaming interface. for i, k in enumerate(ks): X_ = self.X[masks[i], :] if self.standardize: X_ = X_ - np.mean(X_, axis=0)[None, :] X_ = X_ / (np.std(X_, axis=0, ddof=1)[None, :] + 1e-12) - temp.append(self.model[k].transform(X_)) - temp = np.array(temp) + chunk = self.model[k].transform(X_)[None, ...] + ks_chunk = ks[i:i + 1] + masks_chunk = masks[i:i + 1] + chunk = self._apply_post_(ks_chunk, masks_chunk, chunk) + yield slice(i, i + 1), chunk + + def _apply_post_(self, ks, masks, temp): + """Apply noise / add_dim / b / T / v to a chunk of embedded points. + Factored out of batched_eval_ so iter_eval_ can apply the same + post-processing per chunk. Semantics match the original inline code. + """ if self.noise_var: np.random.seed(self.noise_seed[0]) - temp2 = np.random.normal(0, self.noise_var, (n, temp.shape[2])) + temp2 = np.random.normal(0, self.noise_var, (self.X.shape[0], temp.shape[2])) temp = temp + temp2[masks, :] if self.noise is not None: temp = temp + self.noise[masks] - if self.add_dim: temp = np.concatenate([temp, np.zeros((*temp.shape[:2], 1))], axis=2) - if self.b is not None: temp = temp * self.b[ks][:, None, None] if self.T is not None: @@ -1627,22 +1822,57 @@ def batched_eval_(self, opts): temp = temp + self.v[[ks], :] return temp - def eval_(self, opts, apply_b=True): + def batched_eval_(self, view_index, data_mask, n_jobs=1): + """Materialize the full local-(K-)PCA embedding. + + Thin wrapper around iter_eval_ for callers that need the whole + ``(n_eval, n_neighbors, d)`` array (e.g. clustering's procrustes + cost). Memory-bounded internally via iter_eval_'s chunking and + MemoryError backoff. + + ``n_jobs`` is accepted for backward compatibility and ignored; + the live memory probe in iter_eval_ already reflects whatever + sibling workers have allocated. + """ + ks = view_index + masks = data_mask + n_eval = len(ks) + if n_eval == 0: + d = self.Psi.shape[2] if self.algo == "lpca" else 0 + return np.zeros((0, masks.shape[1] if hasattr(masks, "shape") else 0, d)) + + it = self.iter_eval_(ks, masks) + first_sl, first_chunk = next(it) + if first_sl.stop == n_eval: + # Common path: everything fit in one chunk — return directly, + # no extra allocation or copy. + return first_chunk + + # Multi-chunk path: accumulate into a pre-allocated output array. + out = np.empty((n_eval, first_chunk.shape[1], first_chunk.shape[2]), + dtype=first_chunk.dtype) + out[first_sl] = first_chunk + for sl, chunk in it: + out[sl] = chunk + return out + + def eval_(self, view_index, data_mask, apply_b=True): """Maps points to the new space through their local (K-)PCA prameterizations. Parameters ------ - opts : dict - Must have keys - 'view_index : int - The index of the parameterization to use. + view_index : int + The index of the parameterization to use. + + data_mask : array-like, shape (n_neighbors) + List of points to map to the embedding dimension. - data_mask : array-like, shape (n_neighbors) - List of points to map to the embedding dimension. + apply_b : bool, default=True + Whether to apply the scale transformation b[k]. """ - k = opts["view_index"] - mask = opts["data_mask"] + k = view_index + mask = data_mask if self.algo == "lpca": temp = np.dot( @@ -1692,9 +1922,9 @@ def replace_(self, new_param_ind): else: # ISOMAP, LKPCA self.model = self.model[new_param_ind] - def reconstruct_(self, opts): - k = opts["view_index"] - y_ = opts["embeddings"] + def reconstruct_(self, view_index, embeddings): + k = view_index + y_ = embeddings if self.algo == "LPCA": temp = ( np.dot( diff --git a/src/pyRATS/global_distortion.py b/src/pyRATS/global_distortion.py new file mode 100644 index 0000000..c96db6c --- /dev/null +++ b/src/pyRATS/global_distortion.py @@ -0,0 +1,282 @@ +from scipy.sparse import csr_matrix +from scipy.sparse.csgraph import shortest_path +from sklearn.neighbors import NearestNeighbors +from pyRATS._tear_coloring import compute_tear_graph +import numpy as np +from joblib import Parallel, delayed + + +def _shortest_paths( + X, + k, + metric="euclidean", +): + nbrs = NearestNeighbors(n_neighbors=k, metric=metric).fit(X) + knn_graph = nbrs.kneighbors_graph(mode="distance") + + return shortest_path( + knn_graph, + return_predecessors=False, + directed=False, + ) + + +def _compute_tear_aware_shortest_path_distances( + param, + y, + Utilde, + n_Utilde_Utilde, + Utildeg, + C, + c, + k, + nu, + metric="euclidean", + max_crossings=5, + tol=1e-6, + dist=None, + n_batches=None, + debug=False, + n_jobs=-1, +): + if n_batches is None: + n_batches = n_jobs + if dist is None: + dist = _shortest_paths(y, n_nbrs=k, metric=metric, return_predqecessors=False) + + pts_across_tear, tear_graph = compute_tear_graph( + y, + Utilde, + C, + n_Utilde_Utilde, + k, + nu, + metric, + False, + n_jobs, + i_mat_in_emb=Utildeg, + ) + if tear_graph is None: + return dist + + tear_graph = tear_graph.tocoo() + tear_graph_row_inds = tear_graph.row + tear_graph_col_inds = tear_graph.col + tear_graph_with_weights = _add_weights_to_tear_graph( + param, tear_graph, pts_across_tear, c + ) + dist_of_pts_across_tear = tear_graph_with_weights.data + tear_graph_row_inds = pts_across_tear[tear_graph_row_inds] + tear_graph_col_inds = pts_across_tear[tear_graph_col_inds] + + # make shortcuts in dist + for k in range(len(tear_graph_row_inds)): + dist[tear_graph_row_inds[k], tear_graph_col_inds[k]] = dist_of_pts_across_tear[ + k + ] + + final_dist = _recompute_dist_using_tear( + dist, + pts_across_tear, + max_crossings=max_crossings, + n_batches=n_batches, + tol=tol, + debug=debug, + ) + + return final_dist + + +def _add_weights_to_tear_graph( + param, tear_graph, pts_across_tear, cluster_label, n_batches=8 +): + tear_graph = tear_graph.copy().tocoo() + tear_graph_row_inds = tear_graph.row + tear_graph_col_inds = tear_graph.col + n_edges = len(tear_graph_row_inds) + dist_of_pts_across_tear = np.zeros(n_edges) + + def process_(start_ind, end_ind): + temp = np.zeros(end_ind - start_ind) + np.inf + for k in range(start_ind, end_ind): + r_i = tear_graph_row_inds[k] + c_i = tear_graph_col_inds[k] + edge_i = pts_across_tear[r_i] + edge_j = pts_across_tear[c_i] + # for view_ind in view_cont_pts_across_tear[(edge_i, edge_j)]: + for view_ind in [cluster_label[edge_i], cluster_label[edge_j]]: + local_coords = param.eval_( + {"data_mask": [edge_i, edge_j], "view_index": view_ind} + ) + temp[k - start_ind] = min( + temp[k - start_ind], + np.linalg.norm(local_coords[0, :] - local_coords[1, :]), + ) + return temp + + chunk_sz = n_edges // n_batches + start_end_inds = [] + for i in range(n_batches): + start_ind = i * chunk_sz + if i < n_batches - 1: + end_ind = start_ind + chunk_sz + else: + end_ind = n_edges + start_end_inds.append((start_ind, end_ind)) + + results = Parallel(n_jobs=n_batches)( + delayed(process_)(a, b) for a, b in start_end_inds + ) + dist_of_pts_across_tear = np.concatenate(results) + + return csr_matrix( + (dist_of_pts_across_tear, (tear_graph_row_inds, tear_graph_col_inds)), + shape=tear_graph.shape, + ) + + +def _recompute_dist_using_tear( + dist, pts_across_tear, max_crossings=20, n_batches=32, tol=1e-6, debug=False +): + if n_batches < 1: + n_batches = 5 + + n = dist.shape[0] + if debug: + dists = [dist.copy()] + for i_crossing in range(max_crossings): + old_dist = dist.copy() + dist_from_pts_across_tear = dist[pts_across_tear, :].T # n x n_pts_across_tear + + def process_(dist, start_ind, end_ind): + dist_ = np.zeros((end_ind - start_ind, n)) + for i_ in range(start_ind, end_ind): + i = i_ - start_ind + dist_[i, :] = np.minimum( + dist[i, :], + np.minimum( + dist[i, :], + np.min( + dist_from_pts_across_tear[i_, :][:, None] + + dist_from_pts_across_tear.T, + axis=0, + ), + ), + ) + return dist_ + + start_end_inds = [] + chunk_sz = n // n_batches + for j in range(n_batches): + start_ind = j * chunk_sz + if j < n_batches - 1: + end_ind = start_ind + chunk_sz + else: + end_ind = n + start_end_inds.append((start_ind, end_ind)) + results = Parallel(n_jobs=n_batches)( + delayed(process_)(dist.copy()[a:b, :], a, b) for a, b in start_end_inds + ) + + for i in range(len(results)): + start_ind, end_ind = start_end_inds[i] + dist[start_ind:end_ind, :] = results[i] + + if debug: + dists.append(dist.copy()) + + mean_abs_rel_diff = np.ma.masked_invalid( + np.abs(dist - old_dist) / (old_dist + 1e-12) + ).mean() + + if mean_abs_rel_diff < tol: + break + + return dist + + +def _compute_distortion_at(y_d_e, s_d_e): + scale_factors = (y_d_e + 1e-12) / (s_d_e + 1e-12) + mask = np.ones(scale_factors.shape, dtype=bool) + np.fill_diagonal(mask, 0) + max_distortion = np.max(scale_factors[mask]) / np.min(scale_factors[mask]) + n = y_d_e.shape[0] + distortion_at = np.zeros(n) + mask = np.ones(n, dtype=bool) + for i in range(n): + mask[i] = 0 + distortion_at[i] = np.max(scale_factors[i, mask]) / np.min( + scale_factors[i, mask] + ) + mask[i] = 1 + return distortion_at, max_distortion + + +def get_n_nbrs_for_distortion_analysis( + X, + n_nbrs_list=list(range(5, 16)), + tol=1e-3, + persistence=3, + metric="euclidean", +): + + gt_dist_list = [_shortest_paths(X, k, metric=metric) for k in n_nbrs_list] + + spd_rel_mean_abs_diff = [] + for i in range(1, len(gt_dist_list)): + temp = np.mean( + np.abs( + (gt_dist_list[i] - gt_dist_list[i - 1]) / (gt_dist_list[i - 1] + 1e-12) + ) + ) + spd_rel_mean_abs_diff.append(temp) + + while True: + temp = np.array(spd_rel_mean_abs_diff) < tol + if np.sum(temp) == 0: + tol = 2 * tol + else: + temp2 = np.zeros(len(temp) - persistence + 1, dtype=bool) + for i in range(len(temp) - persistence + 1): + if np.prod(temp[i : i + persistence]): + temp2[i] = 1 + if np.sum(temp2) == 0: + tol = 2 * tol + else: + n_nbrs_ind = np.argmax(temp2) + break + + return n_nbrs_list[n_nbrs_ind] + + +def compute_global_distortion( + X, + y, + n_nbrs, + model, + metric="euclidean", + max_crossings=20, +): + + gt_dist = _shortest_paths(X, n_nbrs, metric=metric) + + emb_dist = _shortest_paths(y, n_nbrs) + + if model.to_tear: + emb_dist = _compute_tear_aware_shortest_path_distances( + model.param, + y, + model.Utilde, + model.n_Utilde_Utilde, + model.Utildeg, + model.C, + model.c, + n_nbrs, + model.nu, + metric, + max_crossings=max_crossings, + dist=emb_dist, + n_jobs=model.n_jobs, + ) + + return _compute_distortion_at(emb_dist, gt_dist) diff --git a/src/pyRATS/rats.py b/src/pyRATS/rats.py index 175942d..7895a3b 100644 --- a/src/pyRATS/rats.py +++ b/src/pyRATS/rats.py @@ -1,8 +1,10 @@ import numpy as np +import warnings from scipy.sparse import csr_matrix -from scipy.spatial.distance import squareform from multiprocessing import cpu_count +from joblib import Parallel, delayed +from tqdm.auto import tqdm from pyRATS._utils import ( nearest_neighbors, @@ -18,71 +20,78 @@ from pyRATS._tear_coloring import compute_color_of_pts_on_tear +# _postprocess_col_range removed to reduce Parallel overhead in tight loops. + + class RATS: """Riemannian Alignment of Tangent Spaces Parameters ---------- - d : int - Intrinsic dimension of the manifold. + n_components : int, default=2 + Intrinsic dimension of the manifold (target embedding dimension). - k : int + n_neighbors : int, default=28 Neighborhood size for local view fitting. - to_postprocess : bool, default=True + postprocess : bool, default=True If True, searches for local embeddings that minimize the local distortion. Useful for noisy manifolds. - kpca_kernel : {'linear', 'poly', 'rbf', 'sigmoid', 'cosine', 'precomputed'} or Callable, default='linear' - Kernel used for PCA. See https://scikit-learn.org/stable/modules/generated/sklearn.decomposition.KernelPCA.html for more information. + kernel : {None, 'poly', 'rbf', 'sigmoid', 'cosine', 'precomputed'} or Callable, default=None + Kernel used for PCA. None uses standard linear PCA (lpca). Any other value + uses sklearn.decomposition.KernelPCA. See + https://scikit-learn.org/stable/modules/generated/sklearn.decomposition.KernelPCA.html + for more information. - cost_fn_name : {'alignment', 'distortion'}, default='alignment' - 'alignment': Alignment error should be prefered when runtime matters or dealing with noisy manifolds. + cost_function : {'alignment', 'distortion'}, default='alignment' + 'alignment': Alignment error should be preferred when runtime matters or dealing with noisy manifolds. 'distortion': Distortion is slow, but works well on low noise manifolds. - eta_min : int, default=5 - Minimum number of points per cluster. A cluster is formed by merging local embeddings that minimize cost_fn_name. - Larger clusters improve the runtime, and regularise noisy manifolds. - The value must be >= 1. + min_cluster_size : int, default=5 + Minimum number of points per cluster. A cluster is formed by merging local + embeddings that minimise cost_function. Larger clusters improve runtime and + regularise noisy manifolds. The value must be >= 1. - eta_max : int, default=25 - Maximum allowed size of the clusters. - The value must be > eta_min. + max_cluster_size : int, default=25 + Maximum allowed size of the clusters. The value must be > min_cluster_size. - to_tear : bool, default=True + tear : bool, default=True Whether to tear the manifold. nu : int, default=3 The ratio of the size of local views in the embedding against those in the data. - max_iter : int - Number of iterations to refine the global embedding. - In total Riemannian gradient descent is run for max_iter * max_internal_iter iterations. - For every iteration in max_iter, the alignment of points in the embedding is recomputed and the tear is re-evaluated if to_tear=True. + n_iter : int, default=20 + Number of outer iterations to refine the global embedding. + Riemannian gradient descent runs for n_iter * n_iter_inner total steps. + For every iteration the alignment of points in the embedding is recomputed + and the tear is re-evaluated if tear=True. - max_internal_iter : int - The number of internal iterations used by Riemannian Gradient Descent. + n_iter_inner : int, default=100 + Number of internal iterations used by Riemannian Gradient Descent per outer step. - alpha : float - The step size used in the Riemannian gradient descent. + alpha : float, default=0.3 + Step size used in the Riemannian gradient descent. - patience : int - The number of iterations to wait for error below tolerance - to persist before stopping the refinement. + n_iter_without_progress : int, default=5 + The number of outer iterations to wait after the relative change in alignment + error and tear size both drop below *tol* before stopping early. - tol : float - The tolerance level for the relative change in the alignment error and the + tol : float, default=1e-2 + Tolerance level for the relative change in the alignment error and the relative change in the size of the tear. metric : str, default='euclidean' - To be added in future releases. Metric assumed on the embedding. Currently only 'euclidean' is supported. + To be added in future releases. Metric assumed on the embedding. + Currently only 'euclidean' is supported. - kpca_fit_inverse_transform : bool, default=False - To be added in future releases. If True, computes the inverse of the embedding transformation. - This flag must be only be enabled when running RATS with using the kpca_kernel. - If running pca with kpca_kernel=None, kpca_fit_inverse_transform is always True. + fit_inverse_transform : bool, default=False + To be added in future releases. If True, computes the inverse of the embedding + transformation. Must only be enabled together with a non-None kernel. + When kernel=None (linear PCA) the inverse is always available. - verbose : bool + verbose : bool, default=False If True, print logs to stdout. n_jobs : int, default=-1 @@ -90,54 +99,152 @@ class RATS: """ + # Mapping from deprecated parameter names to their new equivalents. + _DEPRECATED_PARAMS = { + "d": "n_components", + "k": "n_neighbors", + "max_iter": "n_iter", + "max_internal_iter": "n_iter_inner", + "patience": "n_iter_without_progress", + "cost_fn_name": "cost_function", + "eta_min": "min_cluster_size", + "eta_max": "max_cluster_size", + "kpca_kernel": "kernel", + "kpca_fit_inverse_transform": "fit_inverse_transform", + "to_postprocess": "postprocess", + "to_tear": "tear", + } + def __init__( self, - d=2, - kpca_kernel=None, - kpca_fit_inverse_transform=False, - k=28, - cost_fn_name="alignment", + n_components=2, + kernel=None, + fit_inverse_transform=False, + n_neighbors=28, + cost_function="alignment", metric="euclidean", - to_postprocess=True, - eta_min=5, - eta_max=25, - to_tear=True, + postprocess=True, + min_cluster_size=5, + max_cluster_size=25, + tear=True, nu=3, - max_iter=20, - max_internal_iter=100, + n_iter=20, + n_iter_inner=100, alpha=0.3, eps=1e-8, - patience=5, + n_iter_without_progress=5, tol=1e-2, - n_connected_manifolds=1, + n_forced_clusters=1, verbose=False, n_jobs=-1, + **kwargs, ): + # ------------------------------------------------------------------ + # Backward-compatibility: accept deprecated parameter names and emit + # a DeprecationWarning so old code keeps working for one release. + # ------------------------------------------------------------------ + for old, new in self._DEPRECATED_PARAMS.items(): + if old in kwargs: + warnings.warn( + f"The parameter '{old}' is deprecated and will be removed in a " + f"future release. Use '{new}' instead.", + DeprecationWarning, + stacklevel=2, + ) + # Only override the new-name value if the caller did not also + # supply the new name explicitly. + locals_snapshot = { + "n_components": n_components, + "kernel": kernel, + "fit_inverse_transform": fit_inverse_transform, + "n_neighbors": n_neighbors, + "cost_function": cost_function, + "postprocess": postprocess, + "min_cluster_size": min_cluster_size, + "max_cluster_size": max_cluster_size, + "tear": tear, + "n_iter": n_iter, + "n_iter_inner": n_iter_inner, + "n_iter_without_progress": n_iter_without_progress, + } + if old == "d": + n_components = kwargs.pop(old) + elif old == "k": + n_neighbors = kwargs.pop(old) + elif old == "max_iter": + n_iter = kwargs.pop(old) + elif old == "max_internal_iter": + n_iter_inner = kwargs.pop(old) + elif old == "patience": + n_iter_without_progress = kwargs.pop(old) + elif old == "cost_fn_name": + cost_function = kwargs.pop(old) + elif old == "eta_min": + min_cluster_size = kwargs.pop(old) + elif old == "eta_max": + max_cluster_size = kwargs.pop(old) + elif old == "kpca_kernel": + kernel = kwargs.pop(old) + elif old == "kpca_fit_inverse_transform": + fit_inverse_transform = kwargs.pop(old) + elif old == "to_postprocess": + postprocess = kwargs.pop(old) + elif old == "to_tear": + tear = kwargs.pop(old) + else: + kwargs.pop(old, None) + + if kwargs: + raise TypeError( + f"RATS.__init__() got unexpected keyword arguments: {list(kwargs.keys())}" + ) - assert cost_fn_name in ["alignment", "distortion"] + if cost_function not in ["alignment", "distortion"]: + raise ValueError( + f"cost_function must be 'alignment' or 'distortion', got {cost_function!r}." + ) + if max_cluster_size <= min_cluster_size: + raise ValueError( + f"max_cluster_size ({max_cluster_size}) must be greater than " + f"min_cluster_size ({min_cluster_size})." + ) + if metric != "euclidean": + warnings.warn( + f"metric={metric!r} is not yet fully supported. Only 'euclidean' is " + "used in the embedding step. This parameter will be extended in a " + "future release.", + FutureWarning, + stacklevel=2, + ) + if fit_inverse_transform: + warnings.warn( + "fit_inverse_transform=True is not yet supported and will be " + "ignored. Inverse transform support will be added in a future release.", + FutureWarning, + stacklevel=2, + ) self.verbose = verbose self.n_jobs = n_jobs - self.d = d - self.kpca_kernel = None if kpca_kernel == "linear" else kpca_kernel - self.kpca_fit_inverse_transform = kpca_fit_inverse_transform - self.k = k - if cost_fn_name == "distortion": - self.k_nn0 = max(k, eta_max * k) + self.d = n_components + self.kpca_kernel = kernel + self.kpca_fit_inverse_transform = fit_inverse_transform + self.k = n_neighbors + if cost_function == "distortion": + self.k_nn0 = max(n_neighbors, max_cluster_size * n_neighbors) else: - self.k_nn0 = k - self.cost_fn = cost_fn_name - self.to_postprocess = to_postprocess - self.eta_min, self.eta_max = eta_min, eta_max - self.to_tear = to_tear + self.k_nn0 = n_neighbors + self.cost_fn = cost_function + self.to_postprocess = postprocess + self.eta_min, self.eta_max = min_cluster_size, max_cluster_size + self.to_tear = tear self.nu = nu - self.max_iter, self.max_internal_iter = max_iter, max_internal_iter + self.max_iter, self.max_internal_iter = n_iter, n_iter_inner self.alpha, self.eps = alpha, eps - self.patience, self.tol = patience, tol - self.verbose = verbose + self.patience, self.tol = n_iter_without_progress, tol self.metric = metric - self.n_forced_clusters = n_connected_manifolds + self.n_forced_clusters = n_forced_clusters if self.patience is None: self.patience = self.max_iter @@ -145,6 +252,17 @@ def __init__( if n_jobs == -1: self.n_jobs = cpu_count() + cores = cpu_count() + if self.n_jobs > cores: + warnings.warn( + f"n_jobs={self.n_jobs} exceeds the number of available CPU cores " + f"({cores}). This causes oversubscription and will slow down the " + f"computation. n_jobs has been clamped to {cores}.", + UserWarning, + stacklevel=2, + ) + self.n_jobs = cores + def fit_transform(self, X): """Fit the model on the data in X, and transform X. @@ -158,18 +276,35 @@ def fit_transform(self, X): y : array-like, shape (n_samples, d) X transformed in the new space. """ + n_steps = 5 if self.to_postprocess else 4 + current_step = 1 + if self.verbose: + print(f"[{current_step}/{n_steps}] Fitting neighborhood graph...") self._fit_nbrhd_graph(X) + current_step += 1 # Construct low dimensional local views + if self.verbose: + print(f"[{current_step}/{n_steps}] Fitting local views...") self._fit_local_views(X) + current_step += 1 + if self.to_postprocess: + if self.verbose: + print(f"[{current_step}/{n_steps}] Postprocessing local parameters...") self._postprocess() + current_step += 1 # Construct intermediate views + if self.verbose: + print(f"[{current_step}/{n_steps}] Clustering intermediate views...") c, n_C = self._fit_intermediate_views() + current_step += 1 # Construct Global views + if self.verbose: + print(f"[{current_step}/{n_steps}] Aligning global views...") y = self._fit_global_views(c, n_C) return y @@ -209,8 +344,56 @@ def compute_color_of_pts_on_tear(self, y, tear_color_eig_inds=[0, 1, 2]): self.Utildeg, ) + def fit(self, X): + """Fit the model on X without returning the embedding. + + Parameters + ---------- + X : array-like, shape (n_samples, n_features) + Sample data. + + Returns + ------- + self + """ + raise NotImplementedError( + "fit() is not yet implemented. Use fit_transform() instead." + ) + + def transform(self, X): + """Apply the fitted embedding to new data X. + + Parameters + ---------- + X : array-like, shape (n_samples, n_features) + New data to embed. + + Returns + ------- + y : array-like, shape (n_samples, d) + X transformed in the new space. + """ + raise NotImplementedError( + "transform() is not yet implemented. Use fit_transform() instead." + ) + def inverse_transform(self, y): - NotImplementedError() + """Map points from the embedding space back to the original space. + + Parameters + ---------- + y : array-like, shape (n_samples, d) + Points in the embedding space. + + Returns + ------- + X : array-like, shape (n_samples, n_features) + Points in the original space. + """ + raise NotImplementedError( + "inverse_transform() is not yet implemented." + ) + def _fit_nbrhd_graph(self, X, sort_results=True): """Fitting the neighborhood graph. @@ -311,8 +494,6 @@ def _fit_intermediate_views(self): self.verbose, self.n_jobs, ) - if self.verbose: - print("Pruning and cleaning up.") # Prune empty clusters non_empty_C = n_C > 0 M = np.sum(non_empty_C) @@ -338,8 +519,6 @@ def _fit_intermediate_views(self): np.random.seed(42) self.param.noise_seed = np.random.randint(M * M, size=M) - if self.verbose: - print("Done.") else: c = np.arange(n, dtype=int) C = csr_matrix((np.ones(n), (c, np.arange(n))), shape=(n, n), dtype=bool) @@ -427,109 +606,104 @@ def _postprocess(self): n = self.U.shape[0] - original_dists = np.empty((n, int((self.k * (self.k - 1) // 2)))) - for i in range(n): - U_i = self.U[i, :].indices - d_e_i = self.neighborhood_graph_sym[np.ix_(U_i, U_i)] - d_e_mask_ = squareform(d_e_i.toarray()) - original_dists[i] = d_e_mask_ # These can be stored and reused - - embedded_datapoints = self.param.batched_eval_( - { - "view_index": range(n), - "data_mask": self.U.indices.reshape(n, self.k), - } - ) # tested and they are equivalent - embedded_dists = batched_pdist(embedded_datapoints) - disc_lip_const = np.divide( - embedded_dists, - original_dists, - out=np.full_like(embedded_dists, np.nan), - where=original_dists != 0, - ) - zeta = np.nanmax(disc_lip_const, axis=1) / ( - np.nanmin(disc_lip_const, axis=1) + 1e-12 - ) + # Vectorised: gather all pairwise neighbour distances at once. + # neigh_inds[i] are the k neighbour indices of point i (same as + # connectivity_matrix used below, extracted once here for clarity). + neigh_inds = self.U.indices.reshape(n, self.k) # (n, k) + pair_i, pair_j = np.triu_indices(self.k, k=1) # all upper-triangle pairs + row_idx = neigh_inds[:, pair_i].ravel() # (n * num_pairs,) + col_idx = neigh_inds[:, pair_j].ravel() # (n * num_pairs,) + num_pairs = len(pair_i) + original_dists = np.asarray( + self.neighborhood_graph_sym[row_idx, col_idx] + ).reshape(n, num_pairs) + + # Per-row peak for the downstream batched_pdist: a (chunk, num_pairs, d) + # diff buffer plus its (chunk, num_pairs) result. Folded into the + # iter_eval_ chunk-size budget so the whole pipeline stays bounded. + d_embed = self.param.Psi.shape[2] if self.param.algo == "lpca" else self.d + itemsize = self.param.X.dtype.itemsize + pdist_per_row = num_pairs * (d_embed + 1) * itemsize + + zeta = np.empty(n) + view_idx = np.arange(n, dtype=int) + masks_init = self.U.indices.reshape(n, self.k) + for sl, chunk in self.param.iter_eval_(view_idx, masks_init, + peak_bytes_per_row=pdist_per_row): + chunk_dists = batched_pdist(chunk) + chunk_orig = original_dists[sl] + dlc = np.divide( + chunk_dists, chunk_orig, + out=np.full_like(chunk_dists, np.nan), + where=chunk_orig != 0, + ) + zeta[sl] = np.nanmax(dlc, axis=1) / (np.nanmin(dlc, axis=1) + 1e-12) self.param.zeta = zeta - if self.verbose: - print( - r"Maximum local distoriton before postptocessing is: %0.4f" - % np.max(zeta) - ) connectivity_matrix = self.U.indices.reshape(n, -1) - param_changed = np.arange(n) future_use_phi_of = np.arange(n, dtype=int) - future_use_phi_of_ = np.arange(n, dtype=int) + pbar = None + if self.verbose: + pbar = tqdm(total=n, desc="Refining parameters", unit="pts", leave=False) + + param_changed = np.arange(n) while len(param_changed) > 0: - reconsider_mask = np.isin(connectivity_matrix, param_changed) & ( - connectivity_matrix != np.arange(n)[:, None] + changed_mask = np.zeros(n, dtype=bool) + changed_mask[param_changed] = True + reconsider_mask = ( + changed_mask[connectivity_matrix] + & (connectivity_matrix != np.arange(n)[:, None]) ) - param_changed = np.array([]) - for i in range(connectivity_matrix.shape[1]): + zeta_min = np.full(n, np.inf) + best_col = np.zeros(n, dtype=int) - use_phi_of = connectivity_matrix[:, i][ - reconsider_mask[:, i] - ] # select neighborhood indices of the Phi's to use - if len(use_phi_of) == 0: + for i in range(connectivity_matrix.shape[1]): + col_mask = reconsider_mask[:, i] + points_to_reconsider = np.where(col_mask)[0] + if len(points_to_reconsider) == 0: continue - neighbors_to_reconsider = connectivity_matrix[ - reconsider_mask[:, i] - ] # select datapoint and neighborhood indices who's neighborhood must be re-evaluated - points_to_reconsider = np.where(reconsider_mask[:, i])[ - 0 - ] # list of indices - - batch_embedded_datapoints = self.param.batched_eval_( - { - "view_index": future_use_phi_of[use_phi_of], - "data_mask": neighbors_to_reconsider, - } - ) - batch_embedded_dists = batched_pdist(batch_embedded_datapoints) - batch_original_dists = original_dists[points_to_reconsider] - disc_lip_const = np.divide( - batch_embedded_dists, - batch_original_dists, - out=np.full_like(batch_embedded_dists, np.nan), - where=batch_original_dists != 0, - ) - zeta_ = np.nanmax(disc_lip_const, axis=1) / ( - np.nanmin(disc_lip_const, axis=1) + 1e-12 - ) - - swap_has_improved_point = points_to_reconsider[ - np.where(zeta_ < zeta[points_to_reconsider])[0] - ] # indices of points where swapping by the i'th neighbor improves the local distortion - - param_changed = np.union1d(param_changed, swap_has_improved_point) - zeta[points_to_reconsider] = np.minimum( - zeta_, zeta[points_to_reconsider] - ) + use_phi_of = connectivity_matrix[points_to_reconsider, i] + neighbors_of_points = connectivity_matrix[points_to_reconsider] + batch_original_dists = original_dists[points_to_reconsider] - future_use_phi_of_[swap_has_improved_point] = future_use_phi_of[ - connectivity_matrix[:, i][swap_has_improved_point] - ] + col_zeta = np.empty(len(points_to_reconsider)) + for sl, chunk in self.param.iter_eval_( + future_use_phi_of[use_phi_of], + neighbors_of_points, + peak_bytes_per_row=pdist_per_row, + ): + chunk_dists = batched_pdist(chunk) + chunk_orig = batch_original_dists[sl] + dlc = np.divide( + chunk_dists, chunk_orig, + out=np.full_like(chunk_dists, np.nan), + where=chunk_orig != 0, + ) + col_zeta[sl] = np.nanmax(dlc, axis=1) / ( + np.nanmin(dlc, axis=1) + 1e-12 + ) + + improved = col_zeta < zeta_min[points_to_reconsider] + improved_pts = points_to_reconsider[improved] + best_col[improved_pts] = i + zeta_min[improved_pts] = col_zeta[improved] + + param_changed = np.where(zeta > zeta_min)[0] + future_use_phi_of[param_changed] = future_use_phi_of[ + connectivity_matrix[param_changed, best_col[param_changed]] + ] + zeta = np.minimum(zeta, zeta_min) - future_use_phi_of = ( - future_use_phi_of_.copy() - ) # this is necessary for some reason if self.verbose: - print( - "#Param replaced: %d, max distortion: %f" - % (len(param_changed), np.max(zeta)) - ) + pbar.update(n - len(param_changed) - pbar.n) self.param.zeta = zeta.copy() self.param.replace_(future_use_phi_of) if self.verbose: - print( - f"Maximum local distoriton after post-processing is: %0.4f" - % (np.max(self.param.zeta)) - ) + pbar.close() diff --git a/tests/scripts/benchmark.py b/tests/scripts/benchmark.py new file mode 100644 index 0000000..25708c3 --- /dev/null +++ b/tests/scripts/benchmark.py @@ -0,0 +1,102 @@ +import os +import sys +import time +import json +import argparse + +# Allow importing src and examples +repo_root = os.path.abspath(os.path.join(os.path.dirname(__file__), "../../")) +if repo_root not in sys.path: + sys.path.insert(0, repo_root) + +from pyRATS import rats +from examples import datasets + +def construct_rats( + n_components=2, + n_neighbors=14, + min_cluster_size=5, + cost_function="distortion", + n_iter=3, + nu=4, + verbose=False, +): + """Helper to instantiate RATS with fallback for older versions of the code.""" + try: + # Try new sklearn-style names + return rats.RATS( + n_components=n_components, + n_neighbors=n_neighbors, + min_cluster_size=min_cluster_size, + cost_function=cost_function, + n_iter=n_iter, + nu=nu, + verbose=verbose, + ) + except TypeError: + # Fallback to old names + return rats.RATS( + d=n_components, + k=n_neighbors, + eta_min=min_cluster_size, + cost_fn_name=cost_function, + max_iter=n_iter, + nu=nu, + verbose=verbose, + ) + +def run_benchmark(n_samples, n_neighbors=14, min_cluster_size=5, cost_function="distortion"): + sys.stderr.write(f"Running benchmark for n={n_samples}, n_neighbors={n_neighbors}, min_cluster_size={min_cluster_size}...\n") + sys.stderr.flush() + + # Load dataset + ds = datasets.Datasets() + X, _, _ = ds.kleinbottle4d(n=n_samples) + + model = construct_rats( + n_components=2, + n_neighbors=n_neighbors, + cost_function=cost_function, + min_cluster_size=min_cluster_size, + n_iter=3, + nu=4, + verbose=False, # Set to False for cleaner benchmark output + ) + + start_time = time.perf_counter() + model.fit_transform(X=X) + end_time = time.perf_counter() + + duration = end_time - start_time + sys.stderr.write(f"Completed in {duration:.4f}s\n") + sys.stderr.flush() + return duration + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Run RATS performance benchmarks.") + parser.add_argument("--output", type=str, help="Output JSON file path") + parser.add_argument("--fast", action="store_true", help="Run only a small subset for testing") + args = parser.parse_args() + + if args.fast: + sample_sizes = [400, 400, 500] + else: + sample_sizes = [500 , 500, 1000, 2000, 5000, 10_000, 20_000] + + results = [] + + print("\n| Sample Size | Duration (s) |") + print("|-------------|--------------|") + for n in sample_sizes: + duration = run_benchmark(n) + results.append({ + "name": f"Klein Bottle {n} samples", + "unit": "s", + "value": duration + }) + print(f"| {n:11} | {duration:12.4f} |") + + if args.output: + with open(args.output, "w") as f: + json.dump(results, f, indent=2) + print(f"\nResults saved to {args.output}") diff --git a/tests/scripts/generate_snapshots.py b/tests/scripts/generate_snapshots.py index 21cbf4a..1c89122 100644 --- a/tests/scripts/generate_snapshots.py +++ b/tests/scripts/generate_snapshots.py @@ -46,8 +46,42 @@ def load_dataset(name): return X, labels -def run_model(k, eta_min, cost_fn_name, dataset_name, dirpath, force_compute=False): - fname = f"dataset={dataset_name}_k={k}_eta_min={eta_min}_cost-fn={cost_fn_name}.res" +def construct_rats( + n_components=2, + n_neighbors=28, + min_cluster_size=5, + cost_function="alignment", + n_iter=20, + nu=3, + verbose=False, +): + """Helper to instantiate RATS with fallback for older versions of the code.""" + try: + # Try new sklearn-style names + return rats.RATS( + n_components=n_components, + n_neighbors=n_neighbors, + min_cluster_size=min_cluster_size, + cost_function=cost_function, + n_iter=n_iter, + nu=nu, + verbose=verbose, + ) + except TypeError: + # Fallback to old names + return rats.RATS( + d=n_components, + k=n_neighbors, + eta_min=min_cluster_size, + cost_fn_name=cost_function, + max_iter=n_iter, + nu=nu, + verbose=verbose, + ) + + +def run_model(n_neighbors, min_cluster_size, cost_function, dataset_name, dirpath, force_compute=False): + fname = f"dataset={dataset_name}_k={n_neighbors}_eta_min={min_cluster_size}_cost-fn={cost_function}.res" path = os.path.join(dirpath, fname) if os.path.exists(path) and not force_compute: @@ -55,12 +89,12 @@ def run_model(k, eta_min, cost_fn_name, dataset_name, dirpath, force_compute=Fal X, _ = load_dataset(dataset_name) - model = rats.RATS( - d=2, - k=k, - cost_fn_name=cost_fn_name, - eta_min=eta_min, - max_iter=3, + model = construct_rats( + n_components=2, + n_neighbors=n_neighbors, + cost_function=cost_function, + min_cluster_size=min_cluster_size, + n_iter=3, nu=4, verbose=True, ) @@ -108,9 +142,9 @@ def run_model(k, eta_min, cost_fn_name, dataset_name, dirpath, force_compute=Fal if args.fast_mode: dataset_names = ["small_spherewithhole"] - ks = [14] - eta_mins = [5] - cost_fn_names = ["distortion"] + n_neighbors_list = [14] + min_cluster_sizes = [5] + cost_functions = ["distortion"] else: dataset_names = [ "curvedtorus3d", @@ -121,12 +155,12 @@ def run_model(k, eta_min, cost_fn_name, dataset_name, dirpath, force_compute=Fal "swissrollwithhole", "squarewithtwoholes", ] - ks = [14, 21, 28] - eta_mins = [5, 10] - cost_fn_names = ["distortion", "alignment"] + n_neighbors_list = [14, 21] + min_cluster_sizes = [5, 10] + cost_functions = ["distortion", "alignment"] - argslist = list(itertools.product(ks, eta_mins, cost_fn_names, dataset_names)) + argslist = list(itertools.product(n_neighbors_list, min_cluster_sizes, cost_functions, dataset_names)) - Parallel(n_jobs=-1)( + Parallel(n_jobs=1)( delayed(run_model)(*args, dirpath, force_compute) for args in argslist ) diff --git a/tests/scripts/thread_scaling.py b/tests/scripts/thread_scaling.py new file mode 100644 index 0000000..425e186 --- /dev/null +++ b/tests/scripts/thread_scaling.py @@ -0,0 +1,71 @@ +"""Measure how RATS.fit_transform scales with n_jobs on a kleinbottle. + +Usage: + python tests/scripts/thread_scaling.py [--n N] [--jobs 1 2 4 8] [--repeats R] + +Prints a small table of wall times and speedup vs. the n_jobs=1 baseline. +""" + +import argparse +import os +import sys +import time + +# Allow importing src and examples +repo_root = os.path.abspath(os.path.join(os.path.dirname(__file__), "../../")) +if repo_root not in sys.path: + sys.path.insert(0, repo_root) + +from pyRATS import rats +from examples import datasets + + +def run_once(X, n_jobs): + model = rats.RATS( + n_components=2, + n_neighbors=14, + cost_function="distortion", + min_cluster_size=5, + n_iter=3, + nu=4, + verbose=False, + n_jobs=n_jobs, + ) + t0 = time.perf_counter() + model.fit_transform(X=X) + return time.perf_counter() - t0 + + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument("--n", type=int, default=2000, + help="Klein bottle sample count (default 2000).") + parser.add_argument("--jobs", type=int, nargs="+", default=[1, 2, 4, 8], + help="List of n_jobs values to benchmark.") + parser.add_argument("--repeats", type=int, default=2, + help="Repeats per n_jobs (best time kept).") + args = parser.parse_args() + + print(f"Loading kleinbottle4d with n={args.n}...") + ds = datasets.Datasets() + X, _, _ = ds.kleinbottle4d(n=args.n) + + # Warmup so JIT / first-touch costs don't pollute the first measurement. + print("Warmup (n_jobs=1)...") + run_once(X, 1) + + print(f"\n| n_jobs | best (s) | speedup | all runs (s) |") + print(f"|--------|----------|---------|-------------------------|") + baseline = None + for nj in args.jobs: + runs = [run_once(X, nj) for _ in range(args.repeats)] + best = min(runs) + if baseline is None: + baseline = best + speedup = baseline / best + runs_str = ", ".join(f"{r:.2f}" for r in runs) + print(f"| {nj:6d} | {best:8.3f} | {speedup:7.2f} | {runs_str:23s} |") + + +if __name__ == "__main__": + main() diff --git a/tests/test_e2e.py b/tests/test_e2e.py index 616b947..480e3a3 100644 --- a/tests/test_e2e.py +++ b/tests/test_e2e.py @@ -22,10 +22,10 @@ def get_test_cases(): # Sample name: dataset=small_spherewithhole_k=14_eta_min=5_cost-fn=distortion.res try: dname = fname.split("dataset=")[1].split("_k=")[0] - k = int(fname.split("_k=")[1].split("_eta_min=")[0]) - eta = int(fname.split("_eta_min=")[1].split("_cost-fn=")[0]) - cfn = fname.split("_cost-fn=")[1].replace(".res", "") - cases.append((k, eta, cfn, dname)) + n_neighbors = int(fname.split("_k=")[1].split("_eta_min=")[0]) + min_cluster_size = int(fname.split("_eta_min=")[1].split("_cost-fn=")[0]) + cost_function = fname.split("_cost-fn=")[1].replace(".res", "") + cases.append((n_neighbors, min_cluster_size, cost_function, dname)) except Exception: continue return cases @@ -36,9 +36,9 @@ def get_test_cases(): if not TEST_CASES: TEST_CASES = [(14, 5, "distortion", "dummy_no_baseline")] -@pytest.mark.parametrize("k,eta_min,cost_fn_name,dataset_name", TEST_CASES) -def test_end_to_end(k, eta_min, cost_fn_name, dataset_name): - fname = f"dataset={dataset_name}_k={k}_eta_min={eta_min}_cost-fn={cost_fn_name}.res" +@pytest.mark.parametrize("n_neighbors,min_cluster_size,cost_function,dataset_name", TEST_CASES) +def test_end_to_end(n_neighbors, min_cluster_size, cost_function, dataset_name): + fname = f"dataset={dataset_name}_k={n_neighbors}_eta_min={min_cluster_size}_cost-fn={cost_function}.res" expected_path = os.path.join(EXPECTED_DIR, fname) actual_path = os.path.join(ACTUAL_DIR, fname) diff --git a/tests/test_memory.py b/tests/test_memory.py new file mode 100644 index 0000000..66f5893 --- /dev/null +++ b/tests/test_memory.py @@ -0,0 +1,253 @@ +"""Memory-management tests for pyRATS. + +Covers: + * The live ``_available_memory_bytes`` probe and PYRATS_MEMORY_LIMIT cap. + * ``iter_eval_`` chunks the workload when the budget is tight, and the + streamed output matches the fully-materialized ``batched_eval_`` output. + * A simulated MemoryError causes chunk-size backoff (with a warning). + * Peak Python-tracked allocations during a streamed run stay within the + declared budget. +""" + +import os +import warnings +import tracemalloc + +import numpy as np +import pytest + +from pyRATS import _utils +from pyRATS._utils import Param, _available_memory_bytes, batched_pdist + + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + +def _make_lpca_param(n=400, n_features=32, k=12, d=2, seed=0): + """Build a self-contained lpca-style Param for testing iter_eval_. + + Skips the real lpca() fit (which we don't need) and just stuffs in + well-shaped Psi/mu/X arrays so iter_eval_ has something to chew on. + """ + rng = np.random.default_rng(seed) + p = Param(algo="lpca") + p.X = rng.standard_normal((n, n_features)).astype(np.float64) + p.mu = rng.standard_normal((n, n_features)) + p.Psi = rng.standard_normal((n, n_features, d)) + p.b = np.ones(n) + p.noise_var = 0 + p.noise = None + return p, k + + +@pytest.fixture +def clean_env(monkeypatch): + """Ensure PYRATS_MEMORY_LIMIT and the TTL cache don't leak between tests.""" + monkeypatch.delenv("PYRATS_MEMORY_LIMIT", raising=False) + monkeypatch.setattr(_utils, "_memory_cache", None) + yield monkeypatch + + +# --------------------------------------------------------------------------- +# Probe +# --------------------------------------------------------------------------- + +def test_available_memory_honors_env_var(clean_env): + cap = 128 * 1024 * 1024 # 128 MB + clean_env.setenv("PYRATS_MEMORY_LIMIT", str(cap)) + assert _available_memory_bytes() <= cap + + +def test_available_memory_floor(clean_env): + # A ridiculously small cap still gets clamped to the 64MB floor so + # chunking can always make progress. + clean_env.setenv("PYRATS_MEMORY_LIMIT", "1") + assert _available_memory_bytes() >= 64 * 1024 * 1024 + + +def test_available_memory_is_live(clean_env): + """Env-var changes are picked up after the TTL expires (not cached forever).""" + clean_env.setenv("PYRATS_MEMORY_LIMIT", str(256 * 1024 * 1024)) + first = _available_memory_bytes() + # Expire the cache so the next call re-reads the env var. + clean_env.setattr(_utils, "_memory_cache", None) + clean_env.setenv("PYRATS_MEMORY_LIMIT", str(128 * 1024 * 1024)) + second = _available_memory_bytes() + assert second < first + + +# --------------------------------------------------------------------------- +# iter_eval_ chunking + streaming correctness +# --------------------------------------------------------------------------- + +def test_iter_eval_yields_multiple_chunks_when_budget_tight(clean_env): + p, k = _make_lpca_param(n=300, k=12) + masks = np.tile(np.arange(k), (300, 1)) + view_idx = np.arange(300) + + # Force a tiny budget so we get many chunks. Floor is 64MB so we go + # under it via monkeypatch to actually exercise the path. + clean_env.setattr(_utils, "_MIN_MEMORY_FLOOR", 1024) + clean_env.setenv("PYRATS_MEMORY_LIMIT", "8192") # 8 KB + + chunks = list(p.iter_eval_(view_idx, masks)) + assert len(chunks) > 1, "expected chunking with a tight budget" + + # Sum of chunk lengths must cover the input exactly once. + covered = sum(sl.stop - sl.start for sl, _ in chunks) + assert covered == 300 + + +def test_iter_eval_output_matches_batched_eval(clean_env): + """Chunked stream must reconstruct the same array as the unchunked path.""" + p, k = _make_lpca_param(n=200, k=10) + masks = np.tile(np.arange(k), (200, 1)) + view_idx = np.arange(200) + + # Reference: large budget => one chunk. + clean_env.setenv("PYRATS_MEMORY_LIMIT", str(1 << 30)) + full = p.batched_eval_(view_idx, masks) + + # Streamed: tight budget => many chunks. Sum back and compare. + clean_env.setattr(_utils, "_MIN_MEMORY_FLOOR", 1024) + clean_env.setenv("PYRATS_MEMORY_LIMIT", "16384") + clean_env.setattr(_utils, "_memory_cache", None) # expire cache so tight cap takes effect + streamed = np.empty_like(full) + nchunks = 0 + for sl, chunk in p.iter_eval_(view_idx, masks): + streamed[sl] = chunk + nchunks += 1 + assert nchunks > 1 + np.testing.assert_allclose(streamed, full, rtol=0, atol=0) + + +def test_peak_bytes_per_row_shrinks_chunks(clean_env): + """Caller-supplied downstream-cost estimate must shrink chunk size.""" + p, k = _make_lpca_param(n=500, k=10) + masks = np.tile(np.arange(k), (500, 1)) + view_idx = np.arange(500) + + clean_env.setattr(_utils, "_MIN_MEMORY_FLOOR", 1024) + clean_env.setenv("PYRATS_MEMORY_LIMIT", "65536") + + no_hint = list(p.iter_eval_(view_idx, masks, peak_bytes_per_row=0)) + with_hint = list( + p.iter_eval_(view_idx, masks, peak_bytes_per_row=10 * 1024) + ) + assert len(with_hint) >= len(no_hint), ( + "declaring downstream allocation should produce >= chunks" + ) + + +# --------------------------------------------------------------------------- +# MemoryError backoff +# --------------------------------------------------------------------------- + +def test_memory_error_triggers_backoff_and_warns(clean_env, monkeypatch): + p, k = _make_lpca_param(n=64, k=8) + masks = np.tile(np.arange(k), (64, 1)) + view_idx = np.arange(64) + + real_matmul = np.matmul + state = {"raised": False} + + def fake_matmul(a, b, *args, **kwargs): + # Raise MemoryError exactly once on the first call, then defer to + # real matmul. Mimics an OS-level transient pressure event. + if not state["raised"] and a.shape[0] > 1: + state["raised"] = True + raise MemoryError("simulated pressure") + return real_matmul(a, b, *args, **kwargs) + + monkeypatch.setattr(np, "matmul", fake_matmul) + + with warnings.catch_warnings(record=True) as caught: + warnings.simplefilter("always") + out = p.batched_eval_(view_idx, masks) + + assert state["raised"], "fake_matmul was never invoked" + assert out.shape == (64, k, p.Psi.shape[2]) + assert any( + issubclass(w.category, RuntimeWarning) and "MemoryError" in str(w.message) + for w in caught + ), "expected a RuntimeWarning about MemoryError backoff" + + +def test_memory_error_propagates_when_chunk_already_one(clean_env, monkeypatch): + """If chunk size is already 1, backoff has nowhere to go — must raise.""" + p, k = _make_lpca_param(n=4, k=4) + masks = np.tile(np.arange(k), (4, 1)) + view_idx = np.arange(4) + + clean_env.setattr(_utils, "_MIN_MEMORY_FLOOR", 16) + clean_env.setenv("PYRATS_MEMORY_LIMIT", "16") # forces chunk_size = 1 + + def always_fail(a, b, *args, **kwargs): + raise MemoryError("cannot recover") + + monkeypatch.setattr(np, "matmul", always_fail) + + with pytest.raises(MemoryError): + list(p.iter_eval_(view_idx, masks)) + + +# --------------------------------------------------------------------------- +# Measured peak (Python-tracked allocations) +# --------------------------------------------------------------------------- + +def test_streamed_postprocess_peak_under_budget(clean_env): + """End-to-end peak check: a streamed _postprocess-style reduction must + keep peak Python-tracked allocations near the per-chunk working set, + not the full (n, num_pairs, d) materialized intermediate. + + We measure peak with tracemalloc, which on CPython tracks NumPy buffers + via the default PyMem allocator. This is portable across Linux/macOS/ + Windows and avoids relying on RSS (which depends on OS overcommit). + """ + n, k, d = 1500, 16, 3 + p, _ = _make_lpca_param(n=n, k=k, d=d, n_features=24) + masks = np.tile(np.arange(k), (n, 1)) + view_idx = np.arange(n) + + num_pairs = k * (k - 1) // 2 + itemsize = p.X.dtype.itemsize + + # --- Baseline: materialize the full pipeline (no streaming) ---------- + clean_env.setenv("PYRATS_MEMORY_LIMIT", str(1 << 32)) # effectively unlimited + tracemalloc.start() + full_eval = p.batched_eval_(view_idx, masks) + full_dists = batched_pdist(full_eval) + _, materialized_peak = tracemalloc.get_traced_memory() + tracemalloc.stop() + del full_eval, full_dists + + # --- Streamed: chunked _postprocess-style reduction ------------------ + # Cap at ~10x the per-row downstream peak so chunk_size lands at ~10 + # rows. The streamed peak should be a small fraction of the + # materialized peak. We also lower the floor so the cap actually bites. + pdist_per_row = num_pairs * (d + 1) * itemsize + budget = 10 * pdist_per_row * 8 + clean_env.setattr(_utils, "_MIN_MEMORY_FLOOR", 1024) + clean_env.setenv("PYRATS_MEMORY_LIMIT", str(budget)) + clean_env.setattr(_utils, "_memory_cache", None) # expire cache so tight cap takes effect + + tracemalloc.start() + zeta = np.empty(n) + nchunks = 0 + for sl, chunk in p.iter_eval_(view_idx, masks, peak_bytes_per_row=pdist_per_row): + chunk_dists = batched_pdist(chunk) + zeta[sl] = np.nanmax(chunk_dists, axis=1) + nchunks += 1 + _, streamed_peak = tracemalloc.get_traced_memory() + tracemalloc.stop() + + assert nchunks > 1, "tight budget should produce multiple chunks" + # Streamed peak should be meaningfully smaller. Allow generous slack + # (0.5x) since tracemalloc captures fixed Param state too; the goal is + # to catch regressions where streaming silently degrades to full + # materialization. + assert streamed_peak < 0.5 * materialized_peak, ( + f"streamed peak {streamed_peak} not meaningfully below " + f"materialized peak {materialized_peak}" + )